Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5581d3f1ce | |||
| 8ac1dbf4a8 | |||
| c3d237766d | |||
| 5bc68c95f6 | |||
| 5dca5b5d28 | |||
| 9c9ff6eeba | |||
| d667545185 | |||
| 235bbf9881 | |||
| 30200a4e49 | |||
| eff2c3b7d3 | |||
| 3d79002dfa | |||
| 3d60008965 | |||
| cd0040925a | |||
| fdac449a48 | |||
| 40f5b5fe9e | |||
| 250896cdfa | |||
| 5e8b998a11 | |||
| 53999b2825 |
+22
-8
@@ -150,15 +150,26 @@ def is_deferrable_error(exc: Exception) -> bool:
|
||||
return isinstance(exc, httpx.TransportError)
|
||||
|
||||
|
||||
async def call_deep_or_defer(client: "AIClient", prompt: str, system: str | None = None) -> str:
|
||||
async def call_deep_or_defer(
|
||||
client: "AIClient",
|
||||
prompt: str,
|
||||
system: str | None = None,
|
||||
cfg: "AIModelConfig | None" = None,
|
||||
) -> str:
|
||||
"""call_deep + 보류 변환 — 맥북 불가(503/연결/절단)는 StageDeferred 로 raise.
|
||||
|
||||
deep_summary_worker / summarize_worker(drain) 가 공유. StageDeferred 는 queue_consumer/
|
||||
queue_drain 이 attempts 미소모 + deferred_until 백오프로 처리한다 (sleep-안전 불변식).
|
||||
deep_summary_worker / summarize_worker(drain) / classify_worker(drain) 가 공유.
|
||||
StageDeferred 는 queue_consumer/queue_drain 이 attempts 미소모 + deferred_until
|
||||
백오프로 처리한다 (sleep-안전 불변식).
|
||||
|
||||
cfg: 지정 시 deep 슬롯 대신 이 config 로 호출 (classify drain — deep 슬롯의
|
||||
endpoint 는 쓰되 triage 의 temperature/max_tokens 를 적용한 변형).
|
||||
"""
|
||||
from models.queue import StageDeferred
|
||||
|
||||
try:
|
||||
if cfg is not None:
|
||||
return await client._request(cfg, prompt, system=system)
|
||||
return await client.call_deep(prompt, system=system)
|
||||
except Exception as exc:
|
||||
if is_deferrable_error(exc):
|
||||
@@ -231,20 +242,23 @@ class AIClient:
|
||||
|
||||
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
|
||||
|
||||
async def classify(self, text: str) -> dict:
|
||||
async def classify(self, text: str, cfg=None) -> dict:
|
||||
"""[DEPRECATED] 기존 classify_worker 전용. B-1 에서 summary_triage 로 대체.
|
||||
|
||||
호출부 정리 전 존속. 신규 코드는 call_triage + prompt_render 를 쓸 것.
|
||||
cfg (2026-06-12 fair-share): 지정 시 primary 대신 해당 config 로 호출 —
|
||||
drain classify 가 deep 슬롯(맥북) 경유에 사용. cfg != ai.primary 라
|
||||
_call_chat 의 primary→fallback 자동 전환은 발동하지 않는다 (에러 raw 전파).
|
||||
"""
|
||||
prompt = CLASSIFY_PROMPT.replace("{document_text}", text)
|
||||
response = await self._call_chat(self.ai.primary, prompt)
|
||||
response = await self._call_chat(cfg or self.ai.primary, prompt)
|
||||
return response
|
||||
|
||||
async def summarize(self, text: str, force_premium: bool = False) -> str:
|
||||
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체."""
|
||||
async def summarize(self, text: str, force_premium: bool = False, cfg=None) -> str:
|
||||
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체. cfg = classify() 와 동일."""
|
||||
if force_premium:
|
||||
return await self._call_chat(self.ai.premium, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
|
||||
return await self._call_chat(self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
|
||||
return await self._call_chat(cfg or self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
|
||||
|
||||
async def embed(self, text: str) -> list[float]:
|
||||
"""벡터 임베딩 — GPU 서버 전용"""
|
||||
|
||||
@@ -244,7 +244,15 @@ async def regenerate(
|
||||
user: Annotated[User, Depends(require_admin)],
|
||||
):
|
||||
"""수동 트리거 — 백그라운드 태스크로 워커 실행 (admin 필요)."""
|
||||
from core.config import settings
|
||||
from workers.digest_worker import run
|
||||
|
||||
# 홀드 중 silent no-op 방지 — 워커 게이트와 동일 조건을 표면에서 명시.
|
||||
if "digest" in settings.pipeline_held_stages:
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail="global_digest 보류 중 (config.yaml pipeline.held_stages) — 해제 후 재시도",
|
||||
)
|
||||
|
||||
asyncio.create_task(run())
|
||||
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}
|
||||
|
||||
+160
-6
@@ -2,8 +2,9 @@
|
||||
|
||||
확정 결정:
|
||||
- D-1 경로 = /api/eid/chat (main.py prefix=/api/eid + 본 라우터 POST /chat)
|
||||
- D-2 mode 닫힌 어휘: daily(mac-mini-default) / deep(qwen-macbook). 클라는 mode 만 보냄 —
|
||||
claude-cloud / auto 금지 (Literal 로 422 차단). 심층(deep) 모드 무게이트.
|
||||
- D-2 mode 닫힌 어휘: daily / deep — 둘 다 mac-mini-default (맥북 백지화 2026-06-11,
|
||||
맥미니 Qwen 27B 단일 호스트. deep = ReAct 자동검색 모드 구분). 클라는 mode 만 보냄 —
|
||||
claude-cloud / auto 금지 (Literal 로 422 차단). 게이트 = alias 기준 자동 적용(무게이트 폐지).
|
||||
- D-3 독립 /chat 라우트 (frontend) — 본 모듈은 백엔드 API 만.
|
||||
- D-5 LLM 호출 = EidAIClient.call_stream 한 곳 (이드 egress 봉쇄 불변식 #5,
|
||||
RouterBackend 직접 호출 금지).
|
||||
@@ -18,24 +19,58 @@ backend 실패는 /api/search/ask 와 동일 shape 의 503 + error_reason 매핑
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import Annotated, Literal
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, Depends
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
from pydantic import BaseModel, Field, field_validator, model_validator
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from core.auth import get_current_user
|
||||
from core.database import get_session
|
||||
from core.utils import setup_logger
|
||||
from eid import compose as eid_compose
|
||||
from eid.ai import EidAIClient
|
||||
from models.user import User
|
||||
from services.llm.backends import BackendUnavailable
|
||||
from services.llm.backends import BackendUnavailable, _router_url, get_backend
|
||||
from services.search import llm_gate
|
||||
from services.search.react_loop import agentic_ask_loop
|
||||
|
||||
logger = setup_logger("eid_chat")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
# ── ds-eid-ask-absorb P1: deep 모드 = ReAct 자동검색 (맥미니 Qwen 27B, 2026-06-11~) ──
|
||||
# 비생성 reachability probe — router 도달만 확인(coarse). 27B(맥북) 자체 미가용은
|
||||
# 첫 generate_with_tools 호출의 BackendUnavailable → mid-stream error envelope 로 커버
|
||||
# (plan: probe 정밀도 불필요, TOCTOU 는 in-stream error 가 처리). ~2s 타임아웃·생성 슬롯 비점유.
|
||||
_DEEP_PROBE_TIMEOUT = httpx.Timeout(connect=2.0, read=2.0, write=2.0, pool=2.0)
|
||||
# heartbeat: ReAct 다회 tool call 시 수십초 무출력 → 프록시 idle timeout 차단.
|
||||
# `{"phase":"ping"}` no-op 이벤트 (프론트 envelope 파서가 자연 스킵 — `: ping` comment 는
|
||||
# POST SSE fetch 파서가 처리 보장 안 됨).
|
||||
_HEARTBEAT_INTERVAL_S = 10.0
|
||||
|
||||
|
||||
async def _probe_router_reachable() -> bool:
|
||||
"""router(:8890) /v1/models GET — 도달 확인(비생성). 실패/비200 = 미가용."""
|
||||
url = f"{_router_url().rstrip('/')}/v1/models"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_DEEP_PROBE_TIMEOUT) as client:
|
||||
resp = await client.get(url)
|
||||
return resp.status_code == 200
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _sse(obj: dict) -> bytes:
|
||||
"""SSE 이벤트 1건 — data: <json>\\n\\n. final_answer 는 OpenAI 호환 choices.delta.content
|
||||
로, sources/phase 는 별 envelope 키로(프론트가 분기). model/usage 머신 메타 미포함."""
|
||||
return b"data: " + json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n\n"
|
||||
|
||||
|
||||
class ChatMessage(BaseModel):
|
||||
"""채팅 턴 1건. role=system 은 Literal 밖 → 422 (system 합본은 서버 compose 만 주입)."""
|
||||
@@ -71,16 +106,130 @@ class ChatRequest(BaseModel):
|
||||
return self
|
||||
|
||||
|
||||
@router.get("/status")
|
||||
async def eid_status(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
):
|
||||
"""이드 backend 점유 상태 스냅샷 — GET /api/eid/status (UI 의 "대기 vs 고장" 구분용).
|
||||
|
||||
daily(맥미니 MLX) 의 DS 프로세스 내부 llm_gate 점유만 본다 — 외부 소비자
|
||||
(맥미니 자체 derived-worker·Hermes 등)의 endpoint 점유는 미포착.
|
||||
따라서 busy=true 는 확실(지금 줄이 있다), false 는 근사(외부 점유 가능성 잔존).
|
||||
|
||||
가벼움 보장: DB 0 / LLM 0 / 본문 로깅 0 — 폴링 대상으로 안전.
|
||||
자동 fallback 판단 근거로 쓰지 않는다 (모드 전환 = 명시 버튼만, 정책).
|
||||
"""
|
||||
snap = llm_gate.gate_status()
|
||||
inflight = bool(snap["inflight"])
|
||||
waiters = int(snap["waiters"])
|
||||
return {
|
||||
"daily": {
|
||||
"busy": inflight or waiters > 0,
|
||||
"inflight": inflight,
|
||||
"waiters": waiters,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _backend_unavailable_response(body: ChatRequest, reason: str, backend_name: str) -> JSONResponse:
|
||||
"""스트림 시작 전 27B 미가용 → ask 컨벤션과 동일 shape 503 (자동 fallback 0)."""
|
||||
logger.warning(
|
||||
"eid_chat backend_unavailable mode=%s turns=%d status=503 reason=%s",
|
||||
body.mode, len(body.messages), reason,
|
||||
)
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={
|
||||
"error": "backend_unavailable",
|
||||
"error_reason": reason,
|
||||
"backend_requested": backend_name,
|
||||
"detail": (
|
||||
"심층 엔진(검색)이 일시적으로 응답할 수 없습니다. "
|
||||
"잠시 후 다시 시도하거나 일상 모드로 물어보세요."
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def _eid_chat_deep(body: ChatRequest, session: AsyncSession) -> StreamingResponse | JSONResponse:
|
||||
"""deep 모드 = ReAct 자동검색. ReAct(`tool_choice=auto`)가 검색 여부를 LLM 자율 판단 —
|
||||
검색 불요 질문은 early-exit 으로 대화 답변. substrate(persona+rules+react_ask task)는
|
||||
agentic_ask_loop 내부 compose("react_ask") 가 주입(evidence-first 자동 상속).
|
||||
|
||||
멀티턴 = 1단계는 마지막 user 메시지 단독 처리(agentic_ask_loop 가 query: str — history
|
||||
미지원). 후속 질문 대명사 해소는 2단계 백로그.
|
||||
"""
|
||||
# ① 첫 SSE 바이트(=HTTP 200 확정) 전 비생성 probe — router 도달 실패 시 503 (재매핑 가능 구간)
|
||||
if not await _probe_router_reachable():
|
||||
return _backend_unavailable_response(body, "router_unreachable", "mac-mini-default")
|
||||
|
||||
query = body.messages[-1].content # 메시지 단독 처리 (마지막 user 턴)
|
||||
backend = get_backend("mac-mini-default")
|
||||
|
||||
async def _stream() -> AsyncIterator[bytes]:
|
||||
# ② phase:searching 방출 = HTTP 200 확정. 이후 미가용은 503 불가 → in-stream error.
|
||||
yield _sse({"phase": "searching"})
|
||||
task = asyncio.create_task(agentic_ask_loop(session, query, backend=backend))
|
||||
try:
|
||||
# heartbeat: task 미완 동안 ~10s 마다 ping (shield 로 wait_for 취소가 task 안 죽임)
|
||||
while not task.done():
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.shield(task), timeout=_HEARTBEAT_INTERVAL_S)
|
||||
except asyncio.TimeoutError:
|
||||
yield _sse({"phase": "ping"})
|
||||
result = task.result() # BackendUnavailable 은 여기서 raise (mid-stream)
|
||||
# final_answer = OpenAI 호환 1청크(프론트 기존 content 누적 경로 재사용)
|
||||
yield _sse({"choices": [{"delta": {"content": result.final_answer}}]})
|
||||
# 근거 = 별 envelope (citation 번호 없음 — 프론트가 순서 기반). partial = 근거 부족 표식
|
||||
yield _sse({"eid_sources": result.sources, "partial": result.partial})
|
||||
yield b"data: [DONE]\n\n"
|
||||
logger.info(
|
||||
"eid_chat deep ok turns=%d sources=%d partial=%s iters=%d",
|
||||
len(body.messages), len(result.sources), result.partial, result.iterations,
|
||||
)
|
||||
except BackendUnavailable as exc:
|
||||
# mid-stream 미가용(검색 중 AC 분리·뚜껑 닫힘) — 200 이미 송신, in-stream error envelope.
|
||||
# error 뒤 [DONE] = 프론트 sawDone 로 '중단' 오경보 방지(명시 error notice 유지).
|
||||
logger.warning(
|
||||
"eid_chat deep mid-stream unavailable turns=%d reason=%s",
|
||||
len(body.messages), exc.reason,
|
||||
)
|
||||
yield _sse({"phase": "error", "error_reason": exc.reason})
|
||||
yield b"data: [DONE]\n\n"
|
||||
except asyncio.CancelledError:
|
||||
raise # 클라 disconnect — finally 가 task 정리
|
||||
except Exception:
|
||||
logger.exception("eid_chat deep stream failed turns=%d", len(body.messages))
|
||||
yield _sse({"phase": "error", "error_reason": "deep_failed"})
|
||||
yield b"data: [DONE]\n\n"
|
||||
finally:
|
||||
# 클라 disconnect 시 ReAct task 고아화 방지 — cancel + await(전파 완료 보장).
|
||||
# 안 하면 27B 가 닫힌 연결 위해 수분 점유, router 동시성상 다음 검색 대기.
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
|
||||
return StreamingResponse(
|
||||
_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={"Cache-Control": "no-store", "X-Accel-Buffering": "no"},
|
||||
)
|
||||
|
||||
|
||||
@router.post("/chat")
|
||||
async def eid_chat(
|
||||
body: ChatRequest,
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
):
|
||||
"""이드 채팅 — router SSE 스트리밍 pass-through.
|
||||
"""이드 채팅 — daily = router SSE pass-through(대화) / deep = ReAct 자동검색(근거).
|
||||
|
||||
503 두 경로 (둘 다 자동 fallback 없음):
|
||||
503 경로 (모두 자동 fallback 없음):
|
||||
- substrate_degraded: rules.md 부재 (D-6 fail-closed, 채팅 진행 거부)
|
||||
- backend_unavailable: 스트림 시작 전 backend 실패 (ask 컨벤션과 동일 shape)
|
||||
- backend_unavailable: 스트림 시작 전 backend 실패 (daily/deep 공통, ask 컨벤션 shape)
|
||||
"""
|
||||
# D-6: rules 부재 = fail-closed. 채팅은 안전·정책 가드 없이 진행하지 않는다(배너 X).
|
||||
if not eid_compose.rules_present():
|
||||
@@ -99,6 +248,11 @@ async def eid_chat(
|
||||
},
|
||||
)
|
||||
|
||||
# deep = ReAct 자동검색 (별 흐름 — probe + 동기 ReAct → SSE 변환)
|
||||
if body.mode == "deep":
|
||||
return await _eid_chat_deep(body, session)
|
||||
|
||||
# daily = 순수 대화 SSE pass-through (기존)
|
||||
system = eid_compose.compose("eid_chat", task="")
|
||||
client = EidAIClient()
|
||||
stream = client.call_stream(
|
||||
|
||||
@@ -1,20 +1,30 @@
|
||||
"""처리 머신 보드 API — GET /api/queue/overview (plan ds-processing-ui-6an).
|
||||
"""처리 머신 보드 API — /api/queue/* (plan ds-processing-ui-6an → ds-board-engines-1).
|
||||
|
||||
홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계 로직은
|
||||
services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와 계약 고정.
|
||||
응답에 raw 모델명 노출 금지 — 머신 label 만.
|
||||
- GET /overview: 홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계
|
||||
로직은 services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와
|
||||
계약 고정. 응답에 raw 모델명 노출 금지 — 머신 label 만 (엔진/모델 표기는
|
||||
FE 정적 맵 책임).
|
||||
- GET /failed + POST /retry|/skip: 실패 처리 (ds-board-engines-1) — 영구 실패
|
||||
(자동 재시도 3회 소진)의 유일한 사용자 조치 경로. 일괄 조치는 FE 가 그룹의
|
||||
id 목록을 모아 보낸다 (서버측 패턴 매칭 없음 — raw 식별자/패턴 미수신).
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Annotated, Literal
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from core.auth import get_current_user
|
||||
from core.database import get_session
|
||||
from models.user import User
|
||||
from services.queue_overview import build_overview
|
||||
from services.queue_overview import (
|
||||
build_overview,
|
||||
fetch_failed_items,
|
||||
retry_failed,
|
||||
skip_failed,
|
||||
)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -64,11 +74,17 @@ class Totals(BaseModel):
|
||||
|
||||
|
||||
class StageRow(BaseModel):
|
||||
"""단계별 현황 행 — '단계 상세' 패널용 (완료 가시화)."""
|
||||
"""단계별 현황 행 — 흐름 노드/상세 패널용.
|
||||
|
||||
done_1h/created_1h = 처리율·유입률 (유입 우세 판정 + ETA 의 FE 재료,
|
||||
ds-board-engines-1 추가 — 수집 SQL 에 이미 있던 값의 노출).
|
||||
"""
|
||||
stage: str
|
||||
pending: int
|
||||
processing: int
|
||||
failed: int
|
||||
done_1h: int
|
||||
created_1h: int
|
||||
done_today: int
|
||||
oldest_pending_age_sec: int | None
|
||||
|
||||
@@ -81,6 +97,40 @@ class QueueOverviewResponse(BaseModel):
|
||||
totals: Totals
|
||||
|
||||
|
||||
class FailedItem(BaseModel):
|
||||
"""영구 실패 행 — 실패 드로어 표시 단위."""
|
||||
id: int
|
||||
stage: str
|
||||
document_id: int
|
||||
title: str
|
||||
attempts: int
|
||||
max_attempts: int
|
||||
error_message: str | None
|
||||
failed_at: datetime | None
|
||||
|
||||
|
||||
class FailedListResponse(BaseModel):
|
||||
items: list[FailedItem]
|
||||
total: int
|
||||
|
||||
|
||||
class QueueActionRequest(BaseModel):
|
||||
"""재시도/건너뛰기 대상 — 실패 행 id 목록 (FE 가 그룹핑 후 전달)."""
|
||||
ids: list[int] = Field(min_length=1, max_length=300)
|
||||
|
||||
|
||||
class RetryResponse(BaseModel):
|
||||
requested: int
|
||||
retried: int
|
||||
not_retried: int
|
||||
|
||||
|
||||
class SkipResponse(BaseModel):
|
||||
requested: int
|
||||
skipped: int
|
||||
not_skipped: int
|
||||
|
||||
|
||||
@router.get("/overview", response_model=QueueOverviewResponse)
|
||||
async def get_queue_overview(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
@@ -88,3 +138,40 @@ async def get_queue_overview(
|
||||
):
|
||||
"""머신 관점 처리 보드 + summarize ETA 집계 (라이브 계산, 신규 테이블 0)"""
|
||||
return QueueOverviewResponse.model_validate(await build_overview(session))
|
||||
|
||||
|
||||
@router.get("/failed", response_model=FailedListResponse)
|
||||
async def get_failed_items(
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
):
|
||||
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)"""
|
||||
items = await fetch_failed_items(session)
|
||||
return FailedListResponse(
|
||||
items=[FailedItem.model_validate(i) for i in items],
|
||||
total=len(items),
|
||||
)
|
||||
|
||||
|
||||
@router.post("/retry", response_model=RetryResponse)
|
||||
async def retry_failed_items(
|
||||
body: QueueActionRequest,
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
):
|
||||
"""실패 행 재시도 — attempts 리셋 + pending 복귀.
|
||||
|
||||
not_retried = 같은 (문서, 단계) 의 active 행 충돌(uq_queue_active) 또는
|
||||
이미 failed 가 아닌 행 (중복 클릭 등) — 건드리지 않고 건수만 보고.
|
||||
"""
|
||||
return RetryResponse.model_validate(await retry_failed(session, body.ids))
|
||||
|
||||
|
||||
@router.post("/skip", response_model=SkipResponse)
|
||||
async def skip_failed_items(
|
||||
body: QueueActionRequest,
|
||||
user: Annotated[User, Depends(get_current_user)],
|
||||
session: Annotated[AsyncSession, Depends(get_session)],
|
||||
):
|
||||
"""실패 행 건너뛰기 — completed 마킹(payload.skipped_by_user) + 연쇄 없음"""
|
||||
return SkipResponse.model_validate(await skip_failed(session, body.ids))
|
||||
|
||||
@@ -158,6 +158,17 @@ class Settings(BaseModel):
|
||||
# 업로드 한도 (authoritative policy)
|
||||
upload: UploadConfig = UploadConfig()
|
||||
|
||||
# 생성 LLM 홀드 (2026-06-11): config.yaml pipeline.held_stages 에 든 이름의
|
||||
# 컨슈머/워커는 claim 자체를 하지 않는다 (attempts 미소모, pending 적체 = 의도).
|
||||
# 유효 키 = 큐 stage 명(classify/summarize/deep_summary) + cron/컨슈머 키(digest,
|
||||
# briefing, study_explanation, study_session_analysis, study_memo_card).
|
||||
# 빈 리스트 = 무동작 (기존 동작 그대로).
|
||||
pipeline_held_stages: list[str] = []
|
||||
|
||||
# mlx gate 동시 실행 상한 (2026-06-12, config.yaml pipeline.mlx_gate_concurrency).
|
||||
# 1 = 구 single-inference 동작. 2 = continuous batching 활용 (llm_gate docstring 참조).
|
||||
mlx_gate_concurrency: int = 1
|
||||
|
||||
# PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini
|
||||
# GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거.
|
||||
study_explanation_enabled: bool = True
|
||||
@@ -244,6 +255,21 @@ def load_settings() -> Settings:
|
||||
)
|
||||
)
|
||||
|
||||
pipeline_held_stages: list[str] = []
|
||||
mlx_gate_concurrency = 1
|
||||
if config_path.exists() and raw and "pipeline" in raw:
|
||||
held_raw = (raw.get("pipeline") or {}).get("held_stages") or []
|
||||
# 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용.
|
||||
if not isinstance(held_raw, (list, tuple)):
|
||||
held_raw = [held_raw]
|
||||
pipeline_held_stages = [str(s) for s in held_raw]
|
||||
try:
|
||||
mlx_gate_concurrency = max(
|
||||
1, int((raw.get("pipeline") or {}).get("mlx_gate_concurrency", 1))
|
||||
)
|
||||
except (TypeError, ValueError):
|
||||
mlx_gate_concurrency = 1
|
||||
|
||||
taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {}
|
||||
document_types = raw.get("document_types", []) if config_path.exists() and raw else []
|
||||
upload_cfg = (
|
||||
@@ -272,6 +298,8 @@ def load_settings() -> Settings:
|
||||
study_explanation_enabled=study_explanation_enabled,
|
||||
study_card_extract_enabled=study_card_extract_enabled,
|
||||
internal_worker_token=internal_worker_token,
|
||||
pipeline_held_stages=pipeline_held_stages,
|
||||
mlx_gate_concurrency=mlx_gate_concurrency,
|
||||
)
|
||||
|
||||
|
||||
|
||||
+10
-7
@@ -29,16 +29,19 @@ import httpx
|
||||
from ai.client import AIClient
|
||||
from services.llm.backends import (
|
||||
MAC_MINI_DEFAULT,
|
||||
QWEN_MACBOOK,
|
||||
BackendUnavailable,
|
||||
_router_url, # router URL 단일 출처 재사용 (settings → env LLM_ROUTER_URL → MVP default)
|
||||
)
|
||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||
|
||||
# 이드 채팅 mode → router alias 닫힌 매핑 (D-2). 클라는 mode 만 보냄 — claude-cloud/auto 금지.
|
||||
# 2026-06-11 맥북 백지화: deep 도 mac-mini-default (맥미니 Qwen 27B 단일 호스트).
|
||||
# mode 구분은 유지 — deep = ReAct 자동검색 경로(모델이 아니라 동작이 다름).
|
||||
# 게이트는 alias==MAC_MINI_DEFAULT 조건이라 deep 도 자동으로 mlx gate 적용
|
||||
# (llm_gate "예외 없이 gate 획득 필수" invariant 충족 — 구 무게이트는 맥북 예외였음).
|
||||
_CHAT_ALIAS: dict[str, str] = {
|
||||
"daily": MAC_MINI_DEFAULT, # router tier_b → Mac mini :8801 gemma-4-26b
|
||||
"deep": QWEN_MACBOOK, # router named upstream → M5 Max Qwen3.6-27B (무게이트, D-2)
|
||||
"daily": MAC_MINI_DEFAULT, # router tier_b → Mac mini :8801
|
||||
"deep": MAC_MINI_DEFAULT, # 맥북 폐기로 동일 upstream — ReAct 검색 모드 구분만 유지
|
||||
}
|
||||
|
||||
# read 는 per-chunk 적용이라 MacBook wake(24s)+토큰 생성 간격 커버. connect 는 내부 router 라 짧게.
|
||||
@@ -161,10 +164,10 @@ class EidAIClient(AIClient):
|
||||
_rewrite_sse_line 으로 model 치환(mode 어휘)·usage 제거만 하고 프레이밍은 보존.
|
||||
취소/disconnect 시 AsyncExitStack 이 response·client 정리(upstream 닫힘 보장).
|
||||
|
||||
daily(mac-mini-default)는 Mac mini MLX 단일 inference 영구 룰(llm_gate docstring
|
||||
"예외 없이 gate 획득 필수")에 따라 acquire_mlx_gate(FOREGROUND) 안에서 스트리밍 —
|
||||
RouterBackend 의 requires_gate=True 와 동일한 client-side mutex 효과.
|
||||
deep(qwen-macbook)은 별 endpoint 라 무게이트 (D-2, RouterBackend 동형).
|
||||
daily/deep 모두 mac-mini-default(2026-06-11 맥북 백지화) → Mac mini MLX 단일
|
||||
inference 영구 룰(llm_gate docstring "예외 없이 gate 획득 필수")에 따라
|
||||
acquire_mlx_gate(FOREGROUND) 안에서 스트리밍 — 게이트 조건이 alias 기준이라
|
||||
deep 도 자동 적용 (구 무게이트는 맥북 별 endpoint 시절 예외였음).
|
||||
|
||||
중계 전체(업스트림 진입~종료)는 asyncio.timeout(_STREAM_DEADLINE_S) wall-clock
|
||||
deadline 안 — llm_gate 계약 "timeout 은 gate 안쪽" 준수(gate 대기엔 미적용).
|
||||
|
||||
+4
-1
@@ -61,7 +61,7 @@ async def lifespan(app: FastAPI):
|
||||
from workers.csb_collector import run as csb_collector_run
|
||||
from workers.api_standards_collector import run as api_standards_run
|
||||
from workers.ccps_collector import run as ccps_collector_run
|
||||
from workers.queue_consumer import consume_queue, consume_markdown_queue
|
||||
from workers.queue_consumer import consume_queue, consume_fast_queue, consume_markdown_queue
|
||||
from workers.study_queue_consumer import consume_study_queue
|
||||
from workers.study_session_queue_consumer import consume_study_session_queue
|
||||
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
|
||||
@@ -95,6 +95,9 @@ async def lifespan(app: FastAPI):
|
||||
# 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을
|
||||
# stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지.
|
||||
scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer")
|
||||
# 2026-06-12 fast-consumer split: embed/chunk(건당 <1s)를 LLM 사이클에서 분리 —
|
||||
# classify(~190s×3)가 사이클을 점유해 벡터 적재가 굶던 구조 캡 해소 (markdown 선례).
|
||||
scheduler.add_job(consume_fast_queue, "interval", minutes=1, id="fast_queue_consumer")
|
||||
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
|
||||
scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup")
|
||||
# PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리).
|
||||
|
||||
@@ -14,6 +14,11 @@ from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.database import Base
|
||||
|
||||
# FK("users.id") 해석에 users 테이블 메타데이터 필요 — fastapi 앱은 어차피 전 모델을
|
||||
# import 하지만, CLI 단독 실행(queue_drain 등)은 본 모듈만 끌어와 INSERT 시
|
||||
# "could not find table 'users'" 로 실패했다 (2026-06-12 drain 로그 실측). 명시 import.
|
||||
from models.user import User # noqa: F401
|
||||
|
||||
|
||||
class AnalyzeEvent(Base):
|
||||
__tablename__ = "analyze_events"
|
||||
|
||||
@@ -22,7 +22,7 @@ from datetime import datetime, timedelta
|
||||
from posixpath import basename
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import bindparam, text
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from core.config import settings
|
||||
@@ -258,6 +258,8 @@ def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]:
|
||||
"pending": st["pending"],
|
||||
"processing": st["processing"],
|
||||
"failed": st["failed"],
|
||||
"done_1h": st["done_1h"],
|
||||
"created_1h": st["created_1h"],
|
||||
"done_today": st["done_today"],
|
||||
"oldest_pending_age_sec": age,
|
||||
})
|
||||
@@ -408,3 +410,104 @@ async def build_overview(session: AsyncSession) -> dict:
|
||||
deep_enabled=deep_enabled,
|
||||
now_kst=now_kst,
|
||||
)
|
||||
|
||||
|
||||
# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
|
||||
# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은
|
||||
# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자).
|
||||
|
||||
# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록)
|
||||
# started_at 을 시각 fallback 으로 쓴다.
|
||||
_FAILED_LIST_SQL = """
|
||||
SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts,
|
||||
q.error_message,
|
||||
COALESCE(q.completed_at, q.started_at) AS failed_at,
|
||||
d.title, d.original_filename, d.file_path
|
||||
FROM processing_queue q
|
||||
JOIN documents d ON d.id = q.document_id
|
||||
WHERE q.status = 'failed'
|
||||
ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST
|
||||
LIMIT 300
|
||||
"""
|
||||
|
||||
# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여).
|
||||
# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해.
|
||||
# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 —
|
||||
# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고.
|
||||
_RETRY_SQL = """
|
||||
UPDATE processing_queue q
|
||||
SET status = 'pending', attempts = 0,
|
||||
started_at = NULL, completed_at = NULL
|
||||
WHERE q.id IN :ids
|
||||
AND q.status = 'failed'
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM processing_queue p
|
||||
WHERE p.document_id = q.document_id
|
||||
AND p.stage = q.stage
|
||||
AND p.status IN ('pending', 'processing')
|
||||
AND p.id <> q.id
|
||||
)
|
||||
RETURNING q.id
|
||||
"""
|
||||
|
||||
# 건너뛰기: failed → completed + payload 마킹 (감사 추적).
|
||||
# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가
|
||||
# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로.
|
||||
_SKIP_SQL = """
|
||||
UPDATE processing_queue
|
||||
SET status = 'completed', completed_at = NOW(),
|
||||
payload = COALESCE(payload, '{}'::jsonb)
|
||||
|| jsonb_build_object('skipped_by_user', true,
|
||||
'skipped_at', NOW()::text)
|
||||
WHERE id IN :ids AND status = 'failed'
|
||||
RETURNING id
|
||||
"""
|
||||
|
||||
|
||||
async def fetch_failed_items(session: AsyncSession) -> list[dict]:
|
||||
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)."""
|
||||
rows = (await session.execute(text(_FAILED_LIST_SQL))).all()
|
||||
return [
|
||||
{
|
||||
"id": r[0],
|
||||
"stage": r[1],
|
||||
"document_id": r[2],
|
||||
"attempts": int(r[3] or 0),
|
||||
"max_attempts": int(r[4] or 0),
|
||||
"error_message": r[5],
|
||||
"failed_at": r[6],
|
||||
"title": display_title({
|
||||
"document_id": r[2],
|
||||
"title": r[7],
|
||||
"original_filename": r[8],
|
||||
"file_path": r[9],
|
||||
}),
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
async def retry_failed(session: AsyncSession, ids: list[int]) -> dict:
|
||||
"""failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님."""
|
||||
unique_ids = list(set(ids))
|
||||
stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True))
|
||||
retried = (await session.execute(stmt, {"ids": unique_ids})).all()
|
||||
await session.commit()
|
||||
return {
|
||||
"requested": len(unique_ids),
|
||||
"retried": len(retried),
|
||||
"not_retried": len(unique_ids) - len(retried),
|
||||
}
|
||||
|
||||
|
||||
async def skip_failed(session: AsyncSession, ids: list[int]) -> dict:
|
||||
"""failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음."""
|
||||
unique_ids = list(set(ids))
|
||||
stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True))
|
||||
skipped = (await session.execute(stmt, {"ids": unique_ids})).all()
|
||||
await session.commit()
|
||||
return {
|
||||
"requested": len(unique_ids),
|
||||
"skipped": len(skipped),
|
||||
"not_skipped": len(unique_ids) - len(skipped),
|
||||
}
|
||||
|
||||
@@ -26,8 +26,11 @@ PR-MacBook-RAG-Backend-1 부터 `services.llm.QwenMacBookBackend` 는 별 endpoi
|
||||
- **fallback(Claude Sonnet 4 API) 경로는 gate 제외**. PR #20 이후 fallback = Claude API. 단 현재
|
||||
구현상 `AIClient._call_chat` 내부에서 primary→fallback 전환이 일어나므로
|
||||
fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음).
|
||||
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
|
||||
inference 특성이 깨지지 않는 한 이 값을 올리지 말 것.
|
||||
- ~~**MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**~~ → **2026-06-12 개정**:
|
||||
구 룰의 전제(서버 = single-inference)가 소멸 — 현 mlx_vlm server 는 continuous
|
||||
batching 으로 동시 스트림 흡수(실측). 상한은 config `pipeline.mlx_gate_concurrency`
|
||||
(기본 1, 운영 2). **게이트 자체(상한+우선순위 큐)는 영구 유지** — thundering herd
|
||||
(23 concurrent → 22 timeout 사고) 방지는 계속 이 상한이 담당. 무제한 금지.
|
||||
|
||||
## 우선순위 정책 (B-1, 2026-05-17)
|
||||
|
||||
@@ -80,8 +83,22 @@ from core.utils import setup_logger
|
||||
|
||||
logger = setup_logger("llm_gate")
|
||||
|
||||
# MLX primary는 single-inference → 1
|
||||
MLX_CONCURRENCY = 1
|
||||
|
||||
def _capacity() -> int:
|
||||
"""게이트 동시 실행 상한 — config.yaml `pipeline.mlx_gate_concurrency` (기본 1).
|
||||
|
||||
2026-06-12 일반화: "MLX_CONCURRENCY = 1 고정" 영구 룰의 전제(구 서버 = single-
|
||||
inference, 23 concurrent → 22 timeout 실측)가 소멸 — 현 mlx_vlm server 는
|
||||
continuous batching 으로 동시 스트림을 흡수(2026-06-11 밤 6~8 concurrent 실측
|
||||
정상). 게이트 자체(상한 + 우선순위)는 유지하고 상한만 config 로 — thundering
|
||||
herd 재발 방지는 이 상한이 계속 담당한다. 런타임 매 acquire 시 조회라
|
||||
config 변경 + 프로세스 재기동으로 반영, 테스트는 settings monkeypatch.
|
||||
"""
|
||||
from core.config import settings
|
||||
try:
|
||||
return max(1, int(getattr(settings, "mlx_gate_concurrency", 1)))
|
||||
except (TypeError, ValueError):
|
||||
return 1
|
||||
|
||||
# Background waiter wait_ms 가 이 값 초과 시 WARN (starvation 신호, aging mitigation 은 Phase 2)
|
||||
STARVATION_WARN_MS = 300_000 # 5 min
|
||||
@@ -101,7 +118,7 @@ DEFAULT_PRIORITY: Priority = Priority.BACKGROUND
|
||||
# Tuple format: (priority: int, seq: int, future: asyncio.Future, enqueue_ts: float)
|
||||
_waiters: list[tuple[int, int, asyncio.Future, float]] = []
|
||||
_seq = itertools.count()
|
||||
_inflight: bool = False
|
||||
_inflight_n: int = 0 # 동시 실행 수 (구 bool — capacity 일반화로 카운터)
|
||||
_lock: asyncio.Lock | None = None
|
||||
|
||||
|
||||
@@ -143,7 +160,7 @@ async def acquire_mlx_gate(
|
||||
|
||||
⚠ `asyncio.timeout` 은 반드시 gate 안쪽 (Future await 후) 에 둘 것.
|
||||
"""
|
||||
global _inflight, _waiters
|
||||
global _inflight_n, _waiters
|
||||
|
||||
lock = _get_lock()
|
||||
seq = next(_seq)
|
||||
@@ -152,9 +169,9 @@ async def acquire_mlx_gate(
|
||||
fut: asyncio.Future | None = None
|
||||
|
||||
async with lock:
|
||||
if not _inflight and not _waiters:
|
||||
if _inflight_n < _capacity() and not _waiters:
|
||||
# fast path — 즉시 inflight 진입, Future 생성 안 함
|
||||
_inflight = True
|
||||
_inflight_n += 1
|
||||
else:
|
||||
# 대기열 진입
|
||||
fut = asyncio.get_event_loop().create_future()
|
||||
@@ -194,8 +211,8 @@ async def acquire_mlx_gate(
|
||||
async with lock:
|
||||
next_fut = _dispatch_next_locked()
|
||||
if next_fut is None:
|
||||
_inflight = False
|
||||
# _inflight 는 True 유지 (다음 waiter 가 진입 예정)
|
||||
_inflight_n = max(0, _inflight_n - 1)
|
||||
# next_fut 가 있으면 슬롯 handover — 카운트 유지 (다음 waiter 가 진입 예정)
|
||||
logger.debug(
|
||||
"mlx_gate release duration_ms=%.0f priority=%s seq=%d",
|
||||
duration_ms, priority.name, seq,
|
||||
@@ -222,13 +239,24 @@ def get_mlx_gate():
|
||||
return acquire_mlx_gate(DEFAULT_PRIORITY)
|
||||
|
||||
|
||||
# ── Read-only status (UI 표시용) ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def gate_status() -> dict:
|
||||
"""현재 gate 점유 스냅샷 (read-only, lock-free 근사치 — UI 표시용).
|
||||
|
||||
inflight = 동시 실행 수(int). 기존 소비자(eid status)는 bool() 캐스팅이라 호환.
|
||||
"""
|
||||
return {"inflight": _inflight_n, "waiters": len(_waiters)}
|
||||
|
||||
|
||||
# ── Test helpers (conftest reset) ────────────────────────────────────────────
|
||||
|
||||
|
||||
def _reset_for_test() -> None:
|
||||
"""테스트 fixture 가 fresh loop 마다 호출. production code 에서 사용 X."""
|
||||
global _waiters, _inflight, _lock, _seq
|
||||
global _waiters, _inflight_n, _lock, _seq
|
||||
_waiters = []
|
||||
_inflight = False
|
||||
_inflight_n = 0
|
||||
_lock = None
|
||||
_seq = itertools.count()
|
||||
|
||||
@@ -63,8 +63,41 @@ CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
|
||||
"chunks_table": "document_chunks_cand_snowflake_l_v2",
|
||||
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
|
||||
},
|
||||
# ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ───
|
||||
# embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용,
|
||||
# G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재.
|
||||
# qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측).
|
||||
"cand_qwen06": {
|
||||
"docs_table": "documents_cand_qwen06",
|
||||
"chunks_table": "document_chunks_cand_qwen06",
|
||||
"embed_endpoint": "http://ollama:11434/api/embed",
|
||||
"embed_kind": "ollama",
|
||||
"embed_model": "qwen3-embedding:0.6b",
|
||||
},
|
||||
"cand_qwen4": {
|
||||
"docs_table": "documents_cand_qwen4",
|
||||
"chunks_table": "document_chunks_cand_qwen4",
|
||||
"embed_endpoint": "http://ollama:11434/api/embed",
|
||||
"embed_kind": "ollama",
|
||||
"embed_model": "qwen3-embedding:4b",
|
||||
},
|
||||
"cand_qwen4m": {
|
||||
"docs_table": "documents_cand_qwen4m",
|
||||
"chunks_table": "document_chunks_cand_qwen4m",
|
||||
"embed_endpoint": "http://ollama:11434/api/embed",
|
||||
"embed_kind": "ollama",
|
||||
"embed_model": "qwen3-embedding:4b",
|
||||
"embed_dimensions": 1024,
|
||||
},
|
||||
}
|
||||
|
||||
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
|
||||
# 문구 변경 = 저장=조회 불변식 위반과 동급. 쿼리 측 전용, 문서 적재는 plain).
|
||||
QWEN3_QUERY_INSTRUCT = (
|
||||
"Instruct: Given a web search query, retrieve relevant passages that answer the query"
|
||||
"\nQuery: "
|
||||
)
|
||||
|
||||
# 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist.
|
||||
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
|
||||
# corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point).
|
||||
@@ -137,6 +170,34 @@ async def _embed_query_via_tei(endpoint: str, text_: str) -> list[float] | None:
|
||||
return None
|
||||
|
||||
|
||||
async def _embed_query_via_ollama(cfg: dict, text_: str) -> list[float] | None:
|
||||
"""Phase 2A 후보 쿼리 임베딩 — Ollama /api/embed + 비대칭 instruct prefix.
|
||||
|
||||
쿼리 측 전용: QWEN3_QUERY_INSTRUCT 를 선두에 붙인다 (문서 적재 = plain).
|
||||
embed_dimensions 지정(qwen4m) 시 Ollama dimensions 옵션 = MRL truncate+재정규화
|
||||
(G-1 fixture: 1024 출력 L2=1.0 실측). cache 미사용 — slug 별 분포 상이.
|
||||
"""
|
||||
if not text_:
|
||||
return None
|
||||
import httpx
|
||||
body: dict = {"model": cfg["embed_model"], "input": [QWEN3_QUERY_INSTRUCT + text_]}
|
||||
if cfg.get("embed_dimensions"):
|
||||
body["dimensions"] = cfg["embed_dimensions"]
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=60.0) as c:
|
||||
r = await c.post(cfg["embed_endpoint"], json=body)
|
||||
r.raise_for_status()
|
||||
embs = r.json().get("embeddings")
|
||||
if not isinstance(embs, list) or not embs or not isinstance(embs[0], list):
|
||||
raise ValueError("unexpected /api/embed shape")
|
||||
return embs[0]
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"candidate ollama embed failed model=%s err=%r", cfg.get("embed_model"), exc
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _query_embed_key(text_: str) -> str:
|
||||
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
|
||||
|
||||
@@ -323,7 +384,10 @@ async def search_vector(
|
||||
else:
|
||||
docs_table = cfg["docs_table"]
|
||||
chunks_table = cfg["chunks_table"]
|
||||
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
|
||||
if cfg.get("embed_kind") == "ollama":
|
||||
query_embedding = await _embed_query_via_ollama(cfg, query)
|
||||
else:
|
||||
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
|
||||
|
||||
logger.info(
|
||||
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
|
||||
|
||||
@@ -47,7 +47,7 @@ logger = setup_logger("synthesis")
|
||||
|
||||
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
|
||||
PROMPT_VERSION = "v2"
|
||||
LLM_TIMEOUT_MS = 30000 # 2026-05-17 B-3: 15s 시 동시 부하 (Mac mini 26B classifier+evidence+synthesis serialized) 빈발 timeout — classifier (30s) 와 align
|
||||
LLM_TIMEOUT_MS = 120000 # 2026-06-11 Qwen3.6-27B-6bit 전환: 프리필 ~112 tok/s·디코드 ~11.7 tok/s 실측 — 30s 면 synthesis(답변 본체) 상시 timeout. synthesis 는 graceful skip 불가(=답변 실패)라 단독 상향, config ask.backend.timeout_read_s=120 와 align
|
||||
CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게)
|
||||
CACHE_MAXSIZE = 300
|
||||
MAX_ANSWER_CHARS = 600
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
import asyncio
|
||||
from datetime import date
|
||||
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
from services.briefing.pipeline import run_briefing_pipeline
|
||||
|
||||
@@ -22,6 +23,9 @@ async def run(target_date: date | None = None) -> dict | None:
|
||||
Args:
|
||||
target_date: KST 기준 briefing_date (None = 오늘). API regenerate 가 명시 지정 가능.
|
||||
"""
|
||||
if "briefing" in settings.pipeline_held_stages:
|
||||
logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip")
|
||||
return None
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
run_briefing_pipeline(target_date),
|
||||
|
||||
@@ -31,12 +31,18 @@ from pydantic import BaseModel, Field, ValidationError
|
||||
from sqlalchemy import text as sql_text
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from ai.client import AIClient, parse_json_response, strip_thinking
|
||||
from ai.client import (
|
||||
AIClient,
|
||||
call_deep_or_defer,
|
||||
is_deferrable_error,
|
||||
parse_json_response,
|
||||
strip_thinking,
|
||||
)
|
||||
from ai.envelope import EscalationEnvelope
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.queue import enqueue_stage
|
||||
from models.queue import StageDeferred, enqueue_stage
|
||||
from policy.prompt_render import render_4b, policy_version as compute_policy_version
|
||||
from policy.routing import decide_routing
|
||||
from services.document_telemetry import record_analyze_event
|
||||
@@ -345,13 +351,20 @@ _FRONTMATTER_PRESERVED_KEYS = {
|
||||
# ───────────────────────── main process ────────────────────────────────
|
||||
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
async def process(
|
||||
document_id: int, session: AsyncSession, *, use_deep: bool = False
|
||||
) -> None:
|
||||
"""문서 분류 + 요약 + tier triage.
|
||||
|
||||
1) Legacy: classify() → ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion
|
||||
2) Legacy: summarize() → ai_summary
|
||||
3) PR-B B-1: summary_triage (4B) → ai_tldr/ai_bullets/ai_analysis_tier='triage'
|
||||
|
||||
use_deep (2026-06-12 fair-share, queue_drain 전용): triage LLM 호출을 deep 슬롯
|
||||
(맥북, 라우터 경유)으로 보낸다 — sampling 은 triage 의 temperature/max_tokens 를
|
||||
유지(분류 결정성), endpoint 만 교체. 맥북 불가 = StageDeferred 전파(drain 이
|
||||
보류 처리). False(기본/consumer) = 기존 call_triage(맥미니 직접) 그대로.
|
||||
|
||||
예외 — source_channel='law_monitor':
|
||||
법령은 외부 source-of-truth (law.go.kr) 보유 + immutable + 자동 재수집.
|
||||
AI 분류는 무가치 + 본문 해석 환각 위험. 26B legacy + 4B triage 전부 skip.
|
||||
@@ -446,10 +459,20 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
logger.info(f"doc {document_id}: frontmatter 부분 인식 → LLM 으로 미설정 필드 보완")
|
||||
|
||||
client = AIClient()
|
||||
# fair-share (2026-06-12): use_deep 시 legacy classify/summarize 도 deep 슬롯(맥북)
|
||||
# 경유 — 그래야 drain 의 "맥북 분담" 이 실제로 성립 (triage 만 보내면 50K 요약
|
||||
# 프리필이 맥미니에 남는다). deep 슬롯 sampling = primary 와 동일(0.3/0.9/8192).
|
||||
legacy_cfg = settings.ai.deep if (use_deep and settings.ai.deep is not None) else None
|
||||
try:
|
||||
# ─── 1. Legacy classify (primary 26B) ───
|
||||
# ─── 1. Legacy classify (primary 또는 deep) ───
|
||||
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
|
||||
raw_response = await client.classify(truncated)
|
||||
try:
|
||||
raw_response = await client.classify(truncated, cfg=legacy_cfg)
|
||||
except Exception as exc:
|
||||
if legacy_cfg is not None and is_deferrable_error(exc):
|
||||
# 맥북 불가 — 첫 호출(최저 비용 지점)에서 보류로 전환, doc 쓰기 0
|
||||
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
|
||||
raise
|
||||
parsed = parse_json_response(raw_response)
|
||||
|
||||
if not parsed:
|
||||
@@ -517,12 +540,17 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
"reason": "classify pipeline",
|
||||
}
|
||||
|
||||
# ─── 2. Legacy 요약 (primary 26B) ───
|
||||
summary = await client.summarize(doc.extracted_text[:50000])
|
||||
# ─── 2. Legacy 요약 (primary 또는 deep) ───
|
||||
try:
|
||||
summary = await client.summarize(doc.extracted_text[:50000], cfg=legacy_cfg)
|
||||
except Exception as exc:
|
||||
if legacy_cfg is not None and is_deferrable_error(exc):
|
||||
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
|
||||
raise
|
||||
doc.ai_summary = strip_thinking(summary)
|
||||
|
||||
# ─── 메타데이터 (legacy 완료) ───
|
||||
doc.ai_model_version = settings.ai.primary.model
|
||||
# ─── 메타데이터 (legacy 완료) — 실제 처리 머신 귀속 (drain=qwen-macbook) ───
|
||||
doc.ai_model_version = (legacy_cfg or settings.ai.primary).model
|
||||
doc.ai_processed_at = datetime.now(timezone.utc)
|
||||
|
||||
logger.info(
|
||||
@@ -533,7 +561,9 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
|
||||
# ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ───
|
||||
try:
|
||||
await _run_tier_triage(client, doc, session)
|
||||
await _run_tier_triage(client, doc, session, use_deep=use_deep)
|
||||
except StageDeferred:
|
||||
raise # 보류는 실패가 아님 — drain/consumer 가 attempts 미소모 처리
|
||||
except Exception as exc:
|
||||
logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}")
|
||||
|
||||
@@ -541,8 +571,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
await client.close()
|
||||
|
||||
|
||||
async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None:
|
||||
"""summary_triage (p3a_short_summary) 경로."""
|
||||
async def _run_tier_triage(
|
||||
client: AIClient, doc: Document, session: AsyncSession, *, use_deep: bool = False
|
||||
) -> None:
|
||||
"""summary_triage (p3a_short_summary) 경로. use_deep = process() 에서 전달 (drain 전용)."""
|
||||
document_id = doc.id
|
||||
text = doc.extracted_text or ""
|
||||
input_chars = len(text)
|
||||
@@ -550,6 +582,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
|
||||
triage_start = time.perf_counter()
|
||||
parse_error: str | None = None
|
||||
triage_out = TriageOutput()
|
||||
# drain 경유 시 triage 도 deep 슬롯(맥북) — sampling 은 triage 것 유지(결정성).
|
||||
deep_triage_cfg = None
|
||||
if use_deep and settings.ai.deep is not None:
|
||||
deep_triage_cfg = settings.ai.deep.model_copy(update={
|
||||
"temperature": settings.ai.triage.temperature,
|
||||
"top_p": settings.ai.triage.top_p,
|
||||
"max_tokens": settings.ai.triage.max_tokens,
|
||||
})
|
||||
|
||||
# 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate
|
||||
if input_chars > TRIAGE_TEXT_LIMIT:
|
||||
@@ -590,7 +630,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
|
||||
prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT])
|
||||
|
||||
try:
|
||||
raw_triage = await client.call_triage(prompt)
|
||||
if deep_triage_cfg is not None:
|
||||
# drain 전용 — deep 슬롯 endpoint + triage sampling. 맥북 불가(StageDeferred)
|
||||
# 는 아래 generic except 에 먹히지 않게 먼저 전파.
|
||||
raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg)
|
||||
else:
|
||||
raw_triage = await client.call_triage(prompt)
|
||||
except StageDeferred:
|
||||
raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"[triage] 4B 호출 실패 id=%s type=%s repr=%r",
|
||||
@@ -656,6 +703,7 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
|
||||
escalation_reason=escalation_reason,
|
||||
parse_error=parse_error,
|
||||
routing_decision=routing_decision,
|
||||
model_name=(deep_triage_cfg.model if deep_triage_cfg is not None else None),
|
||||
)
|
||||
|
||||
|
||||
@@ -670,6 +718,7 @@ async def _apply_triage_result(
|
||||
escalation_reason: str | None,
|
||||
parse_error: str | None,
|
||||
routing_decision=None,
|
||||
model_name: str | None = None, # fair-share: 실제 호출 경로 모델 (None=triage 기본)
|
||||
) -> None:
|
||||
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.
|
||||
|
||||
@@ -760,7 +809,7 @@ async def _apply_triage_result(
|
||||
layers_returned=["tldr", "bullets"] if not parse_error else [],
|
||||
cached=False,
|
||||
latency_ms=latency_ms,
|
||||
model_name=settings.ai.triage.model,
|
||||
model_name=(model_name or settings.ai.triage.model),
|
||||
prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK),
|
||||
error_code=parse_error,
|
||||
source="document_server",
|
||||
|
||||
@@ -54,8 +54,18 @@ class DeepSummaryOutput(BaseModel):
|
||||
confidence: float = 0.5
|
||||
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
"""deep_summary 큐 pickup → 26B 호출 → 필드 저장."""
|
||||
async def process(
|
||||
document_id: int, session: AsyncSession, *, defer_on_deep_unavailable: bool = False
|
||||
) -> None:
|
||||
"""deep_summary 큐 pickup → LLM 호출 → 필드 저장.
|
||||
|
||||
defer_on_deep_unavailable:
|
||||
False (기본, consumer 경로) = 맥북(deep 슬롯) 우선 시도, 불가 시 즉시
|
||||
맥미니 primary 로 처리. 2026-06-12 fair-share: 양 머신이 동일 모델
|
||||
(Qwen3.6-27B-6bit)이라 폴백 = 품질 강등이 아니라 단순 분배.
|
||||
True (queue_drain 전용) = 맥북 불가를 StageDeferred 로 올려 drain 이
|
||||
보류 후 run 을 멈춘다 (drain = 맥북 분담 전용 레버 시멘틱 유지).
|
||||
"""
|
||||
doc = await session.get(Document, document_id)
|
||||
if not doc:
|
||||
raise ValueError(f"deep_summary: document id={document_id} 없음")
|
||||
@@ -111,16 +121,26 @@ async def process(document_id: int, session: AsyncSession) -> None:
|
||||
try:
|
||||
start = time.perf_counter()
|
||||
if deep_cfg is not None:
|
||||
# 맥북 경유 — 맥미니 mlx gate 미점유(게이트는 맥미니 보호 목적). 맥북 불가
|
||||
# (503/연결/생성 중 sleep 절단)는 StageDeferred = 보류, 맥미니 강등 없음.
|
||||
# doc 쓰기는 완주+파싱 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
|
||||
raw = await call_deep_or_defer(client, prompt)
|
||||
# 맥북 우선 — 맥미니 mlx gate 미점유(별 endpoint). doc 쓰기는 완주+파싱
|
||||
# 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
|
||||
try:
|
||||
raw = await call_deep_or_defer(client, prompt)
|
||||
except StageDeferred:
|
||||
if defer_on_deep_unavailable:
|
||||
raise # drain 전용 — 맥북 레버 시멘틱 (보류 후 run 종료)
|
||||
# consumer 경로: 동일 모델이라 강등 아님 — 맥미니가 즉시 처리 (2026-06-12)
|
||||
logger.info(
|
||||
f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)"
|
||||
)
|
||||
used_cfg = settings.ai.primary
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
raw = await client.call_primary(prompt)
|
||||
else:
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
|
||||
raw = await client.call_primary(prompt)
|
||||
latency_ms = int((time.perf_counter() - start) * 1000)
|
||||
except StageDeferred:
|
||||
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), consumer 가 백오프 기록.
|
||||
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), drain 이 백오프 기록.
|
||||
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
|
||||
raise
|
||||
except Exception as exc:
|
||||
|
||||
@@ -10,6 +10,7 @@ global_digests / digest_topics 테이블에 저장한다.
|
||||
|
||||
import asyncio
|
||||
|
||||
from core.config import settings
|
||||
from core.utils import setup_logger
|
||||
from services.digest.pipeline import run_digest_pipeline
|
||||
|
||||
@@ -24,6 +25,9 @@ async def run() -> None:
|
||||
pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout 은 summarizer 가 처리).
|
||||
여기서는 전체 hard cap 만 강제.
|
||||
"""
|
||||
if "digest" in settings.pipeline_held_stages:
|
||||
logger.info("[global_digest] 보류 (pipeline.held_stages) — 이번 실행 skip")
|
||||
return
|
||||
try:
|
||||
result = await asyncio.wait_for(
|
||||
run_digest_pipeline(),
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
|
||||
|
||||
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
|
||||
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
|
||||
|
||||
설계 원칙 (plan r3):
|
||||
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) — 중단/재실행 시 이어서.
|
||||
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
|
||||
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
|
||||
cand 테이블은 동결 범위로만 INSERT (retrieval cand path 가 snapshot filter 를 안 타는 전제).
|
||||
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
|
||||
chunk_worker 의 [제목][섹션][본문]) + plain (instruct prefix 는 쿼리 측 전용 — G-1 불변식).
|
||||
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
|
||||
- qwen4m 은 본 CLI 대상이 아님 — qwen4 적재 후 SQL 파생(subvector+l2_normalize), plan E-1.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import text
|
||||
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from workers.embed_worker import _build_embed_input
|
||||
|
||||
logger = setup_logger("phase2a_cand_backfill")
|
||||
|
||||
OLLAMA_EMBED = "http://ollama:11434/api/embed"
|
||||
|
||||
TARGETS = {
|
||||
"qwen06": {
|
||||
"model": "qwen3-embedding:0.6b", "dim": 1024,
|
||||
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
|
||||
},
|
||||
"qwen4": {
|
||||
"model": "qwen3-embedding:4b", "dim": 2560,
|
||||
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
|
||||
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
|
||||
r.raise_for_status()
|
||||
embs = r.json()["embeddings"]
|
||||
if len(embs) != len(texts):
|
||||
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
|
||||
return embs
|
||||
|
||||
|
||||
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
|
||||
total = 0
|
||||
while True:
|
||||
async with async_session() as session:
|
||||
rows = (await session.execute(text(f"""
|
||||
SELECT d.id FROM documents d
|
||||
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
|
||||
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
|
||||
ORDER BY d.id LIMIT :b
|
||||
"""), {"m": doc_id_max, "b": batch})).scalars().all()
|
||||
if not rows:
|
||||
break
|
||||
docs = [(await session.get(Document, i)) for i in rows]
|
||||
inputs = [_build_embed_input(d) for d in docs]
|
||||
embs = await _embed_batch(http, target["model"], inputs)
|
||||
for d, inp, e in zip(docs, inputs, embs):
|
||||
await session.execute(text(f"""
|
||||
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
|
||||
VALUES (:i, :h, cast(:e AS vector))
|
||||
ON CONFLICT (doc_id) DO NOTHING
|
||||
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
|
||||
await session.commit()
|
||||
total += len(rows)
|
||||
if total % (batch * 10) < batch:
|
||||
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
|
||||
return total
|
||||
|
||||
|
||||
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
|
||||
total = 0
|
||||
while True:
|
||||
async with async_session() as session:
|
||||
rows = (await session.execute(text(f"""
|
||||
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
|
||||
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
|
||||
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
|
||||
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
|
||||
ORDER BY c.id LIMIT :b
|
||||
"""), {"m": chunk_id_max, "b": batch})).all()
|
||||
if not rows:
|
||||
break
|
||||
inputs = [
|
||||
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
|
||||
for r in rows
|
||||
]
|
||||
embs = await _embed_batch(http, target["model"], inputs)
|
||||
for r, e in zip(rows, embs):
|
||||
await session.execute(text(f"""
|
||||
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
|
||||
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
|
||||
"s": r.section_title, "t": r.text, "e": str(e)})
|
||||
await session.commit()
|
||||
total += len(rows)
|
||||
if total % (batch * 10) < batch:
|
||||
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
|
||||
return total
|
||||
|
||||
|
||||
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
|
||||
target = TARGETS[target_key]
|
||||
start = time.monotonic()
|
||||
async with httpx.AsyncClient() as http:
|
||||
nd = await backfill_docs(target, doc_id_max, batch, http)
|
||||
nc = await backfill_chunks(target, chunk_id_max, batch, http)
|
||||
mins = (time.monotonic() - start) / 60
|
||||
async with async_session() as session:
|
||||
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
|
||||
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
|
||||
logger.info(
|
||||
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
|
||||
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
|
||||
p.add_argument("--target", required=True, choices=sorted(TARGETS))
|
||||
p.add_argument("--doc-id-max", type=int, required=True)
|
||||
p.add_argument("--chunk-id-max", type=int, required=True)
|
||||
p.add_argument("--batch", type=int, default=32)
|
||||
a = p.parse_args()
|
||||
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -13,18 +13,25 @@ from sqlalchemy import select, update, delete, exists
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||||
from sqlalchemy.orm import aliased
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
|
||||
|
||||
logger = setup_logger("queue_consumer")
|
||||
|
||||
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
|
||||
_hold_logged = False
|
||||
|
||||
# stage별 배치 크기
|
||||
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
|
||||
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
|
||||
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
|
||||
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
|
||||
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
|
||||
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
|
||||
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
|
||||
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
|
||||
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
|
||||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
|
||||
"fulltext": 3}
|
||||
STALE_THRESHOLD_MINUTES = 10
|
||||
@@ -34,14 +41,21 @@ STALE_THRESHOLD_MINUTES = 10
|
||||
# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다.
|
||||
MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"))
|
||||
|
||||
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue 로 분리.
|
||||
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue,
|
||||
# embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint
|
||||
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
|
||||
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
|
||||
MAIN_QUEUE_STAGES = [
|
||||
"extract", "classify", "summarize", "embed", "chunk",
|
||||
"extract", "classify", "summarize",
|
||||
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
|
||||
]
|
||||
MARKDOWN_QUEUE_STAGES = ["markdown"]
|
||||
|
||||
# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비.
|
||||
# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다
|
||||
# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴.
|
||||
FAST_QUEUE_STAGES = ["embed", "chunk"]
|
||||
|
||||
|
||||
async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES):
|
||||
"""processing 상태로 오래 방치된 항목 복구 (지정 stage 한정)
|
||||
@@ -335,14 +349,43 @@ async def _process_stage(stage, worker_fn):
|
||||
|
||||
async def consume_queue():
|
||||
"""메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리."""
|
||||
global _hold_logged
|
||||
workers = _load_workers()
|
||||
|
||||
held = [s for s in MAIN_QUEUE_STAGES if s in settings.pipeline_held_stages]
|
||||
if held and not _hold_logged:
|
||||
logger.info(f"pipeline.held_stages 보류 중: {held} — claim 하지 않음 (pending 적체 = 의도)")
|
||||
_hold_logged = True
|
||||
|
||||
try:
|
||||
await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
|
||||
except Exception:
|
||||
logger.exception("stale reset failed, but continuing queue consumption")
|
||||
|
||||
for stage in MAIN_QUEUE_STAGES:
|
||||
if stage in settings.pipeline_held_stages:
|
||||
continue
|
||||
await _process_stage(stage, workers[stage])
|
||||
|
||||
|
||||
async def consume_fast_queue():
|
||||
"""embed/chunk 전용 고속 소비자 — LLM 사이클과 완전 디커플 (2026-06-12).
|
||||
|
||||
main 루프는 classify/summarize/deep 가 사이클을 분 단위로 점유해 건당 <1s 짜리
|
||||
embed/chunk 가 사이클당 1번씩만 기회를 얻었다 (실효 ~60건/시 = 적체 원인).
|
||||
분리 후 = 1분 잡 × 배치 10 → 캡 ~600건/시. APScheduler max_instances=1 이라
|
||||
배치가 1분을 넘으면 다음 fire 는 coalesce (폭주 방지).
|
||||
"""
|
||||
workers = _load_workers()
|
||||
|
||||
try:
|
||||
await reset_stale_items(FAST_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
|
||||
except Exception:
|
||||
logger.exception("fast stale reset failed, but continuing queue consumption")
|
||||
|
||||
for stage in FAST_QUEUE_STAGES:
|
||||
if stage in settings.pipeline_held_stages:
|
||||
continue
|
||||
await _process_stage(stage, workers[stage])
|
||||
|
||||
|
||||
|
||||
@@ -30,9 +30,12 @@ from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition
|
||||
|
||||
logger = setup_logger("queue_drain")
|
||||
|
||||
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 (consumer 도 deep 슬롯 시 맥북 경유).
|
||||
# classify 는 triage 경량 호출이라 맥미니 적합 — 대상에서 제외 (plan Q-4).
|
||||
DRAIN_STAGES = ("summarize", "deep_summary")
|
||||
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 / classify = triage 분류.
|
||||
# classify 는 2026-06-12 fair-share 로 합류 — 구 제외 사유(plan Q-4 "triage 경량 = 맥미니
|
||||
# 적합")는 Gemma a4b(42 tok/s) 전제. Qwen 27B 전환 후 classify 가 장문 프리필로 컨슈머
|
||||
# 사이클을 점유하는 최대 병목이라, 맥북(프리필 ~5배)이 가장 효과적인 분담처다.
|
||||
# classify 완료 시 enqueue_next_stage(embed/chunk/markdown) 필수 — 누락 = DAG 단절.
|
||||
DRAIN_STAGES = ("summarize", "deep_summary", "classify")
|
||||
|
||||
|
||||
async def _claim_one(stage: str) -> tuple[int, int] | None:
|
||||
@@ -98,14 +101,16 @@ async def _mark_failed(queue_id: int, exc: Exception) -> None:
|
||||
|
||||
async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None:
|
||||
if stage not in DRAIN_STAGES:
|
||||
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)")
|
||||
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용")
|
||||
if settings.ai.deep is None:
|
||||
raise SystemExit(
|
||||
"config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음"
|
||||
" (맥미니로의 silent 강등 금지)"
|
||||
)
|
||||
|
||||
from workers.classify_worker import process as classify_process
|
||||
from workers.deep_summary_worker import process as deep_summary_process
|
||||
from workers.queue_consumer import enqueue_next_stage
|
||||
from workers.summarize_worker import process as summarize_process
|
||||
|
||||
done = failed = 0
|
||||
@@ -121,11 +126,18 @@ async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int
|
||||
async with async_session() as worker_session:
|
||||
if stage == "summarize":
|
||||
await summarize_process(document_id, worker_session, use_deep=True)
|
||||
elif stage == "classify":
|
||||
await classify_process(document_id, worker_session, use_deep=True)
|
||||
else:
|
||||
# deep_summary 는 deep 슬롯 구성 시 워커가 자체적으로 맥북 경유
|
||||
await deep_summary_process(document_id, worker_session)
|
||||
# deep_summary: drain 은 맥북 전용 레버 — 불가 시 보류(폴백은 consumer 만)
|
||||
await deep_summary_process(
|
||||
document_id, worker_session, defer_on_deep_unavailable=True
|
||||
)
|
||||
await worker_session.commit()
|
||||
await _mark_completed(queue_id)
|
||||
# 다음 stage 연쇄 — classify 는 embed/chunk/markdown enqueue (consumer 와 동형,
|
||||
# summarize/deep_summary 는 next_stages 미등록이라 no-op)
|
||||
await enqueue_next_stage(document_id, stage)
|
||||
done += 1
|
||||
consecutive_defers = 0
|
||||
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
|
||||
|
||||
@@ -14,6 +14,7 @@ from datetime import datetime, timedelta, timezone
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.study_memo_card_job import StudyMemoCardJob
|
||||
@@ -50,6 +51,10 @@ async def reset_stale_card_jobs() -> None:
|
||||
|
||||
async def consume_study_memo_card_queue() -> None:
|
||||
"""APScheduler 진입점. pending card_extract job 을 BATCH_SIZE 만큼 처리."""
|
||||
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
|
||||
if "study_memo_card" in settings.pipeline_held_stages:
|
||||
logger.debug("study_memo_card 보류 (pipeline.held_stages)")
|
||||
return
|
||||
await reset_stale_card_jobs()
|
||||
|
||||
async with async_session() as session:
|
||||
|
||||
@@ -59,6 +59,11 @@ async def reset_stale_study_jobs() -> None:
|
||||
|
||||
async def consume_study_queue() -> None:
|
||||
"""APScheduler 진입점. pending job BATCH_SIZE 만큼 처리."""
|
||||
# 생성 LLM 홀드: env(study_explanation_enabled) 와 별개의 self-contained 게이트.
|
||||
# pending 은 그대로 유지 (Mac mini derived-worker 흡수 경로도 본 게이트와 무관).
|
||||
if "study_explanation" in settings.pipeline_held_stages:
|
||||
logger.debug("study_explanation 보류 (pipeline.held_stages)")
|
||||
return
|
||||
await reset_stale_study_jobs()
|
||||
|
||||
async with async_session() as session:
|
||||
|
||||
@@ -12,6 +12,7 @@ from datetime import datetime, timedelta, timezone
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.study_quiz_session_job import StudyQuizSessionJob
|
||||
@@ -48,6 +49,10 @@ async def reset_stale_session_jobs() -> None:
|
||||
|
||||
async def consume_study_session_queue() -> None:
|
||||
"""APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리."""
|
||||
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
|
||||
if "study_session_analysis" in settings.pipeline_held_stages:
|
||||
logger.debug("study_session_analysis 보류 (pipeline.held_stages)")
|
||||
return
|
||||
await reset_stale_session_jobs()
|
||||
|
||||
async with async_session() as session:
|
||||
|
||||
+40
-12
@@ -6,25 +6,40 @@ ai:
|
||||
|
||||
models:
|
||||
# ─── 단일 generation 호스트 routing (2026-05-14 GPU LLM 제거) ───
|
||||
# GPU Ollama gemma4:e4b-it-q8_0 제거. Mac mini 26B-A4B 가 triage + primary + classifier 모두 흡수.
|
||||
# fallback 은 Claude Sonnet 4 API (Mac mini 다운 시 자동 trigger, premium 과 budget 공유).
|
||||
# plan: ~/.claude/plans/rosy-launching-otter.md §C/§D/§E
|
||||
# 2026-06-11 B안: 맥미니 모델 = Gemma 26B-A4B → Qwen3.6-27B-6bit 풀교체 (사용자 결정).
|
||||
# dense 27B 라 디코드 ~13 tok/s 급 (a4b ~42 대비 감속) → timeout 상향 (triage 30→120, primary 180→300).
|
||||
# fallback 은 Claude Sonnet 4 API (CLAUDE_API_KEY 미주입 = 비활성).
|
||||
# plan: ~/.claude/plans/rosy-launching-otter.md §C/§D/§E + project_macmini_model_decision
|
||||
|
||||
# triage: 상시 분류·요약·근거 선별. Mac mini 26B (primary 와 동일 endpoint, 짧은 max_tokens).
|
||||
# triage: 상시 분류·요약·근거 선별. Mac mini Qwen 27B (primary 와 동일 endpoint, 짧은 max_tokens).
|
||||
triage:
|
||||
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
|
||||
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
|
||||
model: "mlx-community/Qwen3.6-27B-6bit"
|
||||
max_tokens: 4096
|
||||
timeout: 30
|
||||
timeout: 480 # 프리필 실측 ~112 tok/s — 120K자 장문 커버 (2026-06-11)
|
||||
context_char_limit: 120000
|
||||
temperature: 0.0
|
||||
|
||||
# primary: 에스컬레이션 전용. 26B MLX (맥미니 Semaphore(1) 보호 대상).
|
||||
# primary: 에스컬레이션 전용. Qwen 27B MLX (맥미니 Semaphore(1) 보호 대상).
|
||||
primary:
|
||||
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
|
||||
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
|
||||
model: "mlx-community/Qwen3.6-27B-6bit"
|
||||
max_tokens: 8192
|
||||
timeout: 180
|
||||
timeout: 900 # 프리필 실측 ~112 tok/s — 260K자 상한 장문 커버 (2026-06-11)
|
||||
context_char_limit: 260000
|
||||
temperature: 0.3
|
||||
top_p: 0.9
|
||||
|
||||
# deep: 야간 night-drain 전용 — 맥북 M5 Max Qwen3.6-27B-6bit (llm-router :8890 경유,
|
||||
# model=qwen-macbook alias). 2026-06-11 재도입 (사용자: 자기 전 night-drain 으로 백로그 분담).
|
||||
# 맥북 불가(503/연결/절단) = StageDeferred 보류 — 맥미니/cloud 강등 없음, attempts 미소모.
|
||||
# consumer 의 deep_summary 도 슬롯 존재 시 맥북 경유 (잠들어 있으면 30분 백오프 보류 = 무해).
|
||||
# 슬롯 제거 시 deep_summary 는 primary(맥미니) 경로 복귀.
|
||||
deep:
|
||||
endpoint: "http://100.76.254.116:8890/v1/chat/completions"
|
||||
model: "qwen-macbook"
|
||||
max_tokens: 8192
|
||||
timeout: 900
|
||||
context_char_limit: 260000
|
||||
temperature: 0.3
|
||||
top_p: 0.9
|
||||
@@ -58,9 +73,9 @@ ai:
|
||||
# classifier_service 가 hasattr 체크로 optional 이므로 이 섹션 제거 시 classifier gate 는 자동 skip (score-only).
|
||||
classifier:
|
||||
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
|
||||
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
|
||||
model: "mlx-community/Qwen3.6-27B-6bit" # 2026-06-11 B안 동승 — gemma id 잔존 시 mlx 서버가 Gemma 를 재로드(이중 적재) 위험
|
||||
max_tokens: 512
|
||||
timeout: 30 # 2026-05-17: 15s 도 동시 부하 시 elapsed 14.4s 직전이라 tight — 30s 로 2x 마진 (Mac mini 26B concurrent load). classifier_service.LLM_TIMEOUT_MS=30000 와 align
|
||||
timeout: 30 # 2026-05-17: 15s 도 동시 부하 시 elapsed 14.4s 직전이라 tight — 30s 로 2x 마진. classifier_service.LLM_TIMEOUT_MS=30000 와 align (초과 = score-only skip, graceful)
|
||||
# 제거: vision (미사용)
|
||||
|
||||
# ─── deep_summary enqueue 폭발 억제 (B-1 R2) ───
|
||||
@@ -84,7 +99,7 @@ search:
|
||||
macbook_url: "http://100.118.112.84:8810" # MacBook M5 Max Tailscale interface bind
|
||||
macbook_model: "mlx-community/Qwen3.6-27B-8bit"
|
||||
timeout_connect_s: 1 # MacBook sleep/wake 빠른 감지 (자동 fallback 부재 → 빠른 503)
|
||||
timeout_read_s: 30 # synthesis_service.LLM_TIMEOUT_MS=30000 와 align
|
||||
timeout_read_s: 120 # 2026-06-11 Qwen 27B(디코드 ~11.7 tok/s) — synthesis_service.LLM_TIMEOUT_MS=120000 와 align
|
||||
# PR-DocSrv-Ask-ToolCalling-ReAct-1: /api/search/ask/react ReAct loop (qwen-macbook only)
|
||||
react:
|
||||
enabled: true
|
||||
@@ -176,3 +191,16 @@ schedule:
|
||||
daily_digest: "20:00"
|
||||
file_watcher_interval_minutes: 5
|
||||
queue_consumer_interval_minutes: 10
|
||||
|
||||
# 생성 LLM 홀드 게이트 (2026-06-11 신설): held_stages 에 든 이름의 컨슈머/워커는 claim 자체를
|
||||
# 하지 않는다 (attempts 미소모, pending 적체). 유효 키 8 = classify/summarize/deep_summary(큐) +
|
||||
# digest/briefing(cron) + study_explanation/study_session_analysis/study_memo_card(컨슈머).
|
||||
# 그 외 문자열은 무동작(오타 주의). 적용/해제 = 리스트 수정 후 fastapi 재기동.
|
||||
# 이력: 2026-06-11 맥미니 모델 확정까지 8키 홀드 → 同日 Qwen3.6-27B-6bit 전환과 함께 해제([]).
|
||||
pipeline:
|
||||
held_stages: []
|
||||
# mlx gate 동시 실행 상한 (2026-06-12 fair-share): 구 "1 고정" 룰의 전제(single-inference
|
||||
# 서버)가 소멸 — 현 mlx_vlm 은 continuous batching (2026-06-11 밤 6~8 concurrent 실측 정상).
|
||||
# 2 = 워커 LLM 호출과 인터랙티브(ask/eid)가 서로 안 막힘 + 집계 throughput ~1.8배.
|
||||
# 게이트(상한+우선순위)는 유지 — thundering herd 방지. 1 로 되돌리면 구 동작.
|
||||
mlx_gate_concurrency: 2
|
||||
|
||||
@@ -0,0 +1,419 @@
|
||||
<script lang="ts">
|
||||
// 처리 머신 보드 v2 — 파이프라인 흐름 뷰 (plan ds-board-engines-1, R2 통합안).
|
||||
// 메인 = 좌→우 흐름 노드(병목 amber·실패 뱃지), 노드 클릭 = 상세 패널(안1 변형),
|
||||
// 실패 뱃지 클릭 = 실패 처리 드로어 (재시도/건너뛰기 — 영구 실패의 유일한 조치 경로).
|
||||
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어 열 때).
|
||||
import { api } from '$lib/api';
|
||||
import { refreshQueueOverview } from '$lib/stores/queueOverview';
|
||||
import { addToast } from '$lib/stores/toast';
|
||||
import {
|
||||
AUX_NODES,
|
||||
FLOW_NODES,
|
||||
MACHINE_META,
|
||||
type FlowNodeDef,
|
||||
etaShort,
|
||||
flowStageLabel,
|
||||
formatAgeSec,
|
||||
formatRate,
|
||||
} from '$lib/utils/queueDisplay';
|
||||
import type {
|
||||
FailedItem,
|
||||
FailedListResponse,
|
||||
MachineCurrentItem,
|
||||
QueueOverview,
|
||||
QueueStageRow,
|
||||
RetryResponse,
|
||||
SkipResponse,
|
||||
} from '$lib/types/queue';
|
||||
|
||||
let { overview }: { overview: QueueOverview } = $props();
|
||||
|
||||
// ─── 노드 통계 합성 ───
|
||||
interface NodeStats {
|
||||
def: FlowNodeDef;
|
||||
/** 다중 stage 노드(청크·임베딩)는 같은 문서가 양쪽 큐에 있어 max — 합산 = 이중계산 */
|
||||
pending: number;
|
||||
processing: number;
|
||||
failed: number; // 실패는 행 단위 사실이라 합산
|
||||
done1h: number;
|
||||
created1h: number;
|
||||
doneToday: number;
|
||||
oldestAgeSec: number | null;
|
||||
etaMinutes: number | null;
|
||||
inflowDominant: boolean;
|
||||
perStage: QueueStageRow[];
|
||||
}
|
||||
|
||||
const stageBy = $derived(new Map(overview.stages.map((s) => [s.stage, s])));
|
||||
|
||||
function nodeStats(def: FlowNodeDef): NodeStats {
|
||||
const rows = def.stages
|
||||
.map((s) => stageBy.get(s))
|
||||
.filter((r): r is QueueStageRow => r != null);
|
||||
const pending = rows.reduce((m, r) => Math.max(m, r.pending), 0);
|
||||
const done1h = rows.reduce((m, r) => Math.max(m, r.done_1h), 0);
|
||||
const created1h = rows.reduce((m, r) => Math.max(m, r.created_1h), 0);
|
||||
const oldest = rows.reduce<number | null>(
|
||||
(m, r) => (r.oldest_pending_age_sec == null ? m : Math.max(m ?? 0, r.oldest_pending_age_sec)),
|
||||
null,
|
||||
);
|
||||
return {
|
||||
def,
|
||||
pending,
|
||||
processing: rows.reduce((s, r) => s + r.processing, 0),
|
||||
failed: rows.reduce((s, r) => s + r.failed, 0),
|
||||
done1h,
|
||||
created1h,
|
||||
doneToday: rows.reduce((m, r) => Math.max(m, r.done_today), 0),
|
||||
oldestAgeSec: oldest,
|
||||
etaMinutes: pending > 0 && done1h > 0 ? Math.round((pending / done1h) * 60) : null,
|
||||
inflowDominant: pending > 0 && created1h > done1h,
|
||||
perStage: rows,
|
||||
};
|
||||
}
|
||||
|
||||
const mainNodes = $derived(FLOW_NODES.map(nodeStats));
|
||||
const auxAll = $derived(AUX_NODES.map(nodeStats));
|
||||
const auxActive = $derived(
|
||||
auxAll.filter((n) => n.pending + n.processing + n.failed + n.doneToday > 0),
|
||||
);
|
||||
const auxIdle = $derived(
|
||||
auxAll.filter((n) => n.pending + n.processing + n.failed + n.doneToday === 0),
|
||||
);
|
||||
const totalFailed = $derived(overview.totals.failed);
|
||||
|
||||
// 머신 스트립 — overview.machines 의 state/처리율 + 정적 모델 메타
|
||||
const machineStrip = $derived(
|
||||
overview.machines.map((m) => ({
|
||||
...m,
|
||||
meta: MACHINE_META[m.key],
|
||||
})),
|
||||
);
|
||||
|
||||
// ─── 선택 상태 (노드 상세 / 실패 드로어 — 동시에 하나만) ───
|
||||
let selected = $state<string | null>(null);
|
||||
let failOpen = $state(false);
|
||||
|
||||
function toggleNode(key: string) {
|
||||
selected = selected === key ? null : key;
|
||||
if (selected) failOpen = false;
|
||||
}
|
||||
|
||||
const selectedNode = $derived(
|
||||
[...mainNodes, ...auxAll].find((n) => n.def.key === selected) ?? null,
|
||||
);
|
||||
|
||||
function nodeCurrent(def: FlowNodeDef): MachineCurrentItem[] {
|
||||
return overview.machines.flatMap((m) => m.current.filter((c) => def.stages.includes(c.stage)));
|
||||
}
|
||||
|
||||
// ─── 실패 드로어 ───
|
||||
let failItems = $state<FailedItem[]>([]);
|
||||
let failLoading = $state(false);
|
||||
let busy = $state(false);
|
||||
let expanded = $state<Record<string, boolean>>({});
|
||||
|
||||
async function openFailures() {
|
||||
failOpen = true;
|
||||
selected = null;
|
||||
await loadFailures();
|
||||
}
|
||||
|
||||
async function loadFailures() {
|
||||
failLoading = true;
|
||||
try {
|
||||
const r = await api<FailedListResponse>('/queue/failed');
|
||||
failItems = r.items;
|
||||
} catch {
|
||||
addToast('error', '실패 목록을 불러오지 못했습니다');
|
||||
} finally {
|
||||
failLoading = false;
|
||||
}
|
||||
}
|
||||
|
||||
interface FailGroup {
|
||||
key: string;
|
||||
stage: string;
|
||||
pattern: string;
|
||||
items: FailedItem[];
|
||||
}
|
||||
|
||||
// 그룹핑 = stage + 에러 메시지 prefix(36자) — 같은 원인(ReadTimeout 등) 묶음
|
||||
const failGroups = $derived.by(() => {
|
||||
const map = new Map<string, FailGroup>();
|
||||
for (const it of failItems) {
|
||||
const pattern = (it.error_message ?? '(메시지 없음)').slice(0, 36);
|
||||
const key = `${it.stage}::${pattern}`;
|
||||
const g = map.get(key);
|
||||
if (g) g.items.push(it);
|
||||
else map.set(key, { key, stage: it.stage, pattern, items: [it] });
|
||||
}
|
||||
return [...map.values()].sort(
|
||||
(a, b) => a.stage.localeCompare(b.stage) || b.items.length - a.items.length,
|
||||
);
|
||||
});
|
||||
|
||||
async function retryIds(ids: number[]) {
|
||||
if (busy || ids.length === 0) return;
|
||||
busy = true;
|
||||
try {
|
||||
const r = await api<RetryResponse>('/queue/retry', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ ids }),
|
||||
});
|
||||
addToast(
|
||||
'success',
|
||||
`재시도 ${r.retried}건 큐 재진입${r.not_retried > 0 ? ` (${r.not_retried}건 제외 — 이미 활성/처리됨)` : ''}`,
|
||||
);
|
||||
await afterAction();
|
||||
} catch {
|
||||
addToast('error', '재시도 요청 실패');
|
||||
} finally {
|
||||
busy = false;
|
||||
}
|
||||
}
|
||||
|
||||
async function skipIds(ids: number[]) {
|
||||
if (busy || ids.length === 0) return;
|
||||
busy = true;
|
||||
try {
|
||||
const r = await api<SkipResponse>('/queue/skip', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ ids }),
|
||||
});
|
||||
addToast('success', `건너뛰기 ${r.skipped}건 처리 (해당 단계 제외)`);
|
||||
await afterAction();
|
||||
} catch {
|
||||
addToast('error', '건너뛰기 요청 실패');
|
||||
} finally {
|
||||
busy = false;
|
||||
}
|
||||
}
|
||||
|
||||
async function afterAction() {
|
||||
await Promise.all([loadFailures(), refreshQueueOverview()]);
|
||||
}
|
||||
|
||||
// ─── trend_24h 스파크라인 (summarize 유입 vs 소화 — API 가 주는데 미렌더이던 슬롯) ───
|
||||
const spark = $derived.by(() => {
|
||||
const t = overview.trend_24h;
|
||||
if (!t || t.length === 0) return null;
|
||||
const max = Math.max(1, ...t.map((b) => Math.max(b.inflow, b.done)));
|
||||
const w = 120;
|
||||
const h = 24;
|
||||
const step = w / Math.max(1, t.length - 1);
|
||||
const pts = (sel: (b: (typeof t)[number]) => number) =>
|
||||
t.map((b, i) => `${(i * step).toFixed(1)},${(h - (sel(b) / max) * (h - 3) + 1).toFixed(1)}`).join(' ');
|
||||
return { inflow: pts((b) => b.inflow), done: pts((b) => b.done) };
|
||||
});
|
||||
</script>
|
||||
|
||||
<div class="mt-5">
|
||||
<!-- 헤더: 타이틀 + 요약 24h 스파크라인 + 실패 합계 -->
|
||||
<div class="flex items-center justify-between gap-3 mb-3">
|
||||
<div class="text-[11px] font-bold text-dim uppercase tracking-wider">처리 머신</div>
|
||||
<div class="flex items-center gap-3">
|
||||
{#if totalFailed > 0}
|
||||
<button
|
||||
class="text-[11px] font-semibold text-error hover:underline cursor-pointer"
|
||||
onclick={openFailures}
|
||||
>실패 {totalFailed}건 처리</button>
|
||||
{/if}
|
||||
{#if spark}
|
||||
<div class="flex items-center gap-2 text-[10px] text-faint tabular-nums" title="요약(summarize) 단계 24시간 — 유입(회색) vs 소화(녹색)">
|
||||
<svg width="120" height="24" viewBox="0 0 120 24" class="block">
|
||||
<polyline points={spark.inflow} fill="none" stroke="currentColor" stroke-width="1.5" class="text-faint" />
|
||||
<polyline points={spark.done} fill="none" stroke="currentColor" stroke-width="1.5" class="text-success" />
|
||||
</svg>
|
||||
<span>요약 24h 유입/소화</span>
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- 머신 스트립 -->
|
||||
<div class="flex flex-wrap gap-2 mb-3">
|
||||
{#each machineStrip as m (m.key)}
|
||||
<div class="flex items-center gap-2 bg-surface border border-default rounded-full px-3.5 py-1.5 text-xs">
|
||||
<span class="w-2 h-2 rounded-full shrink-0 {m.state === 'active' ? 'bg-success' : m.state === 'deferred' ? 'bg-warning' : 'bg-faint'}"></span>
|
||||
<span class="font-bold text-text">{m.meta?.label ?? m.label}</span>
|
||||
<span class="text-[10px] text-faint font-mono">{m.meta?.model}</span>
|
||||
<span class="text-[11px] text-dim tabular-nums">{formatRate(m.done_1h)}/h</span>
|
||||
{#if m.key === 'macbook' && m.deferred_pending > 0}
|
||||
<span class="text-[10px] font-semibold text-warning tabular-nums">보류 {m.deferred_pending}</span>
|
||||
{/if}
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
|
||||
<!-- 흐름 노드 -->
|
||||
<div class="flex items-stretch overflow-x-auto pb-1">
|
||||
{#each mainNodes as n, i (n.def.key)}
|
||||
{#if i > 0}
|
||||
<div class="flex items-center text-faint text-sm px-1.5 shrink-0" aria-hidden="true">→</div>
|
||||
{/if}
|
||||
<div
|
||||
class="relative bg-surface border-[1.5px] rounded-card px-3 py-2.5 min-w-[124px] shrink-0 text-left transition-colors cursor-pointer hover:bg-surface-hover
|
||||
{n.inflowDominant ? 'border-warning' : n.etaMinutes != null && n.def.stages.includes('chunk') ? 'border-success' : 'border-default'}
|
||||
{selected === n.def.key ? 'node-sel' : ''}"
|
||||
role="button"
|
||||
tabindex="0"
|
||||
onclick={() => toggleNode(n.def.key)}
|
||||
onkeydown={(e) => { if (e.key === 'Enter' || e.key === ' ') { e.preventDefault(); toggleNode(n.def.key); } }}
|
||||
title="{n.def.label} — 클릭하면 상세"
|
||||
>
|
||||
{#if n.failed > 0}
|
||||
<button
|
||||
class="absolute -top-2 -right-1.5 text-[9px] font-extrabold bg-error text-white rounded-full px-1.5 py-px shadow cursor-pointer"
|
||||
onclick={(e) => { e.stopPropagation(); openFailures(); }}
|
||||
title="실패 {n.failed}건 — 클릭하면 실패 처리"
|
||||
>{n.failed}</button>
|
||||
{/if}
|
||||
<span class="inline-block text-[9px] font-bold rounded px-1.5 py-px mb-1.5 mtag-{n.def.machine}">
|
||||
{MACHINE_META[n.def.machine].label} · {n.def.engine}
|
||||
</span>
|
||||
<div class="text-xs font-bold text-text flex items-center gap-1.5">
|
||||
{n.def.label}
|
||||
{#if n.processing > 0}
|
||||
<span class="inline-block w-1.5 h-1.5 rounded-full bg-accent animate-pulse" title="처리 중 {n.processing}"></span>
|
||||
{/if}
|
||||
{#if n.inflowDominant}
|
||||
<span class="text-[9px] font-bold text-warning">유입 우세</span>
|
||||
{/if}
|
||||
</div>
|
||||
<div class="text-base font-extrabold tabular-nums tracking-tight leading-tight mt-0.5 text-text">
|
||||
{n.pending.toLocaleString()}
|
||||
</div>
|
||||
<div class="text-[10px] text-dim tabular-nums">
|
||||
{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}
|
||||
{#if n.etaMinutes != null && !n.inflowDominant && n.pending > 0}
|
||||
· <span class="text-accent font-semibold">{etaShort(n.etaMinutes)}</span>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
|
||||
<!-- 보조 라인 -->
|
||||
<p class="text-[10px] text-faint mt-1.5 tabular-nums">
|
||||
{#each auxActive as n, i (n.def.key)}
|
||||
{i > 0 ? ' · ' : '보조: '}{n.def.label}({n.def.engine}) 대기 {n.pending.toLocaleString()} · {formatRate(n.done1h)}/h{n.failed > 0 ? ` · 실패 ${n.failed}` : ''}
|
||||
{/each}
|
||||
{#if auxIdle.length > 0}
|
||||
{auxActive.length > 0 ? ' — ' : ''}한가: {auxIdle.map((n) => n.def.label).join(' · ')}
|
||||
{/if}
|
||||
— 뉴스 등 일부 소스는 분류/추출을 건너뜀 (흐름 그림은 대표 경로)
|
||||
</p>
|
||||
|
||||
<!-- 상세 패널 (노드 클릭) -->
|
||||
{#if selectedNode}
|
||||
<div class="border rounded-card mt-3 overflow-hidden bg-surface detail-frame">
|
||||
<div class="flex items-center gap-2.5 px-4 py-2.5 text-xs font-bold detail-head">
|
||||
{selectedNode.def.label} — {selectedNode.def.engine}
|
||||
<span class="text-[10px] font-mono font-medium text-dim bg-surface border border-default rounded px-1.5">{selectedNode.def.sub} · {MACHINE_META[selectedNode.def.machine].label}</span>
|
||||
<button class="ml-auto text-[11px] text-dim font-normal cursor-pointer hover:text-text" onclick={() => (selected = null)}>닫기</button>
|
||||
</div>
|
||||
<div class="px-4 pb-3.5">
|
||||
<div class="grid grid-cols-2 md:grid-cols-4 gap-2.5 my-2.5">
|
||||
<div class="bg-bg border border-default rounded-card px-3 py-2">
|
||||
<div class="text-[9px] text-faint uppercase tracking-wide">대기</div>
|
||||
<div class="text-lg font-extrabold tabular-nums text-text">{selectedNode.pending.toLocaleString()}</div>
|
||||
</div>
|
||||
<div class="bg-bg border border-default rounded-card px-3 py-2">
|
||||
<div class="text-[9px] text-faint uppercase tracking-wide">처리율 (1h)</div>
|
||||
<div class="text-lg font-extrabold tabular-nums text-text">{formatRate(selectedNode.done1h)}<span class="text-[11px] text-dim font-semibold">/h</span></div>
|
||||
</div>
|
||||
<div class="bg-bg border border-default rounded-card px-3 py-2">
|
||||
<div class="text-[9px] text-faint uppercase tracking-wide">오늘 완료</div>
|
||||
<div class="text-lg font-extrabold tabular-nums text-text">{selectedNode.doneToday.toLocaleString()}</div>
|
||||
</div>
|
||||
<div class="bg-bg border border-default rounded-card px-3 py-2">
|
||||
<div class="text-[9px] text-faint uppercase tracking-wide">소진 예상</div>
|
||||
<div class="text-lg font-extrabold tabular-nums {selectedNode.inflowDominant ? 'text-warning' : 'text-accent'}">
|
||||
{#if selectedNode.inflowDominant}유입 우세{:else if selectedNode.etaMinutes != null}{etaShort(selectedNode.etaMinutes)}{:else if selectedNode.pending === 0}한가{:else}—{/if}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{#if selectedNode.perStage.length > 1}
|
||||
{#each selectedNode.perStage as row (row.stage)}
|
||||
<div class="flex items-center gap-2.5 py-1.5 border-t border-default text-xs">
|
||||
<span class="font-semibold text-text min-w-[72px]">{flowStageLabel(row.stage)}</span>
|
||||
<span class="ml-auto text-dim tabular-nums">
|
||||
대기 <strong class="text-text">{row.pending.toLocaleString()}</strong>
|
||||
· {formatRate(row.done_1h)}/h · 오늘 {row.done_today.toLocaleString()}
|
||||
{#if row.failed > 0}· <span class="text-error font-semibold">실패 {row.failed}</span>{/if}
|
||||
</span>
|
||||
</div>
|
||||
{/each}
|
||||
{/if}
|
||||
<div class="text-[11px] text-dim border-t border-dashed border-default mt-2 pt-2 tabular-nums">
|
||||
{#if selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600}
|
||||
가장 오래 기다린 항목 {formatAgeSec(selectedNode.oldestAgeSec)}
|
||||
{/if}
|
||||
{#each nodeCurrent(selectedNode.def) as c, i (c.document_id + c.stage)}
|
||||
{i === 0 && !(selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600) ? '' : ' · '}지금: {c.title} ({flowStageLabel(c.stage)})
|
||||
{/each}
|
||||
{#if selectedNode.failed > 0}
|
||||
· <button class="text-error font-semibold cursor-pointer hover:underline" onclick={openFailures}>실패 {selectedNode.failed}건 처리</button>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
<!-- 실패 처리 드로어 -->
|
||||
{#if failOpen}
|
||||
<div class="border border-error/40 rounded-card mt-3 overflow-hidden bg-surface">
|
||||
<div class="flex items-center gap-2.5 px-4 py-2.5 bg-error/5 text-xs font-bold text-text">
|
||||
실패 처리
|
||||
<span class="text-[10px] font-semibold text-error">영구 실패 {failItems.length}건 — 자동 재시도 3회 소진, 수동 조치 대기</span>
|
||||
<button class="ml-auto text-[11px] text-dim font-normal cursor-pointer hover:text-text" onclick={() => (failOpen = false)}>닫기</button>
|
||||
</div>
|
||||
{#if failLoading}
|
||||
<p class="text-xs text-dim text-center py-4">불러오는 중…</p>
|
||||
{:else if failItems.length === 0}
|
||||
<p class="text-xs text-dim text-center py-4">영구 실패 항목 없음</p>
|
||||
{:else}
|
||||
{#each failGroups as g (g.key)}
|
||||
<div class="px-4 py-2.5 border-t border-default">
|
||||
<div class="flex items-center gap-2 flex-wrap text-xs font-bold text-text mb-1">
|
||||
{flowStageLabel(g.stage)} {g.items.length}건
|
||||
<span class="text-[10px] font-mono font-medium text-error bg-error/10 rounded px-1.5 py-px">{g.pattern}{g.items[0]?.error_message && g.items[0].error_message.length > 36 ? '…' : ''}</span>
|
||||
</div>
|
||||
{#each expanded[g.key] ? g.items : g.items.slice(0, 4) as it (it.id)}
|
||||
<div class="flex items-center gap-2.5 py-1 border-t border-dashed border-default/60 text-xs">
|
||||
<span class="flex-1 min-w-0 truncate text-text" title={it.title}>{it.title}</span>
|
||||
<span class="text-[10px] font-mono text-faint shrink-0 tabular-nums">시도 {it.attempts}/{it.max_attempts}</span>
|
||||
<span class="text-[10px] font-mono text-error shrink-0 max-w-[260px] truncate" title={it.error_message ?? ''}>{it.error_message ?? ''}</span>
|
||||
<button class="text-[10px] font-bold border border-accent text-accent rounded px-2 py-0.5 shrink-0 cursor-pointer hover:bg-accent/10 disabled:opacity-40" disabled={busy} onclick={() => retryIds([it.id])}>재시도</button>
|
||||
<button class="text-[10px] font-bold border border-default text-faint rounded px-2 py-0.5 shrink-0 cursor-pointer hover:bg-surface-hover disabled:opacity-40" disabled={busy} onclick={() => skipIds([it.id])}>건너뛰기</button>
|
||||
</div>
|
||||
{/each}
|
||||
{#if g.items.length > 4 && !expanded[g.key]}
|
||||
<button class="text-[10px] text-dim cursor-pointer hover:text-text mt-1" onclick={() => (expanded = { ...expanded, [g.key]: true })}>… 외 {g.items.length - 4}건 펼치기</button>
|
||||
{/if}
|
||||
{#if g.items.length > 1}
|
||||
<div class="flex gap-2 mt-1.5">
|
||||
<button class="text-[10px] font-bold border border-accent text-accent rounded px-2.5 py-0.5 cursor-pointer hover:bg-accent/10 disabled:opacity-40" disabled={busy} onclick={() => retryIds(g.items.map((x) => x.id))}>그룹 전체 재시도 ({g.items.length})</button>
|
||||
<button class="text-[10px] font-bold border border-default text-faint rounded px-2.5 py-0.5 cursor-pointer hover:bg-surface-hover disabled:opacity-40" disabled={busy} onclick={() => skipIds(g.items.map((x) => x.id))}>그룹 전체 건너뛰기</button>
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
{/each}
|
||||
<p class="text-[10px] text-faint px-4 py-2 border-t border-default">
|
||||
재시도 = 시도 횟수 리셋 후 큐 재진입 (자동 재시도 3회 새로 부여) · 건너뛰기 = 이 단계 완료 처리(후속 단계 연쇄 없음, 감사 마킹) · 같은 오류가 반복되는 항목(빈 텍스트 등)은 건너뛰기 권장
|
||||
</p>
|
||||
{/if}
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
|
||||
<style>
|
||||
/* 머신 색 — 디자인 토큰 외 3색 (gpu 청/macmini 보라/macbook 황) — 이 컴포넌트 한정 */
|
||||
.mtag-gpu { background: #e7eef6; color: #3b6ea5; }
|
||||
.mtag-macmini { background: #efe9f7; color: #8a5fbf; }
|
||||
.mtag-macbook { background: #f7eedd; color: #b07a10; }
|
||||
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
|
||||
.detail-frame { border-color: #3b6ea5; }
|
||||
.detail-head { background: #e7eef6; }
|
||||
</style>
|
||||
@@ -0,0 +1,31 @@
|
||||
<!--
|
||||
EidEvidenceCard — 이드 채팅 deep(검색) 답변의 근거 카드 (ds-eid-ask-absorb P1).
|
||||
|
||||
ReactResult.sources = {id, doc_id, title, score} (citation 번호 n 없음 — /ask 의 Citation 과
|
||||
다름) → 순서 기반 번호([1],[2]...). 1단계 카드 = 제목·출처·점수 (스니펫은 react_loop
|
||||
_result_payload items_src 에 없음 — 2단계 후보). 접이식 <details> 로 채팅 흐름 보존.
|
||||
디자인 토큰만 (CLAUDE.md lint:tokens).
|
||||
-->
|
||||
<script lang="ts">
|
||||
type EidSource = { id?: number; doc_id?: number; title?: string; score?: number };
|
||||
let { sources, partial = false }: { sources: EidSource[]; partial?: boolean } = $props();
|
||||
</script>
|
||||
|
||||
{#if sources.length}
|
||||
<details class="mt-2 rounded-lg border border-default bg-surface text-xs max-w-[85%] sm:max-w-[75%]">
|
||||
<summary class="cursor-pointer px-3 py-2 text-dim hover:text-text select-none font-semibold">
|
||||
근거 {sources.length}개{partial ? ' · 부분 답변 (확정 근거 부족)' : ''}
|
||||
</summary>
|
||||
<ul class="px-3 pb-2.5 flex flex-col gap-1.5">
|
||||
{#each sources as src, i (src.id ?? i)}
|
||||
<li class="flex items-start gap-2">
|
||||
<span class="text-accent font-bold shrink-0">[{i + 1}]</span>
|
||||
<span class="flex-1 min-w-0 text-text break-words">{src.title || `문서 ${src.doc_id ?? '?'}`}</span>
|
||||
{#if typeof src.score === 'number'}
|
||||
<span class="text-faint shrink-0 tabular-nums">{src.score.toFixed(2)}</span>
|
||||
{/if}
|
||||
</li>
|
||||
{/each}
|
||||
</ul>
|
||||
</details>
|
||||
{/if}
|
||||
@@ -61,6 +61,10 @@ export interface QueueStageRow {
|
||||
pending: number;
|
||||
processing: number;
|
||||
failed: number;
|
||||
/** 최근 1시간 완료 — 노드 처리율·ETA 재료 (ds-board-engines-1) */
|
||||
done_1h: number;
|
||||
/** 최근 1시간 유입 — 유입 우세 판정 재료 (ds-board-engines-1) */
|
||||
created_1h: number;
|
||||
done_today: number;
|
||||
oldest_pending_age_sec: number | null;
|
||||
}
|
||||
@@ -72,3 +76,33 @@ export interface QueueOverview {
|
||||
stages: QueueStageRow[];
|
||||
totals: QueueTotals;
|
||||
}
|
||||
|
||||
/** ─── 실패 처리 (ds-board-engines-1) — GET /api/queue/failed · POST /retry|/skip ─── */
|
||||
|
||||
export interface FailedItem {
|
||||
id: number;
|
||||
stage: string;
|
||||
document_id: number;
|
||||
title: string;
|
||||
attempts: number;
|
||||
max_attempts: number;
|
||||
error_message: string | null;
|
||||
failed_at: string | null;
|
||||
}
|
||||
|
||||
export interface FailedListResponse {
|
||||
items: FailedItem[];
|
||||
total: number;
|
||||
}
|
||||
|
||||
export interface RetryResponse {
|
||||
requested: number;
|
||||
retried: number;
|
||||
not_retried: number;
|
||||
}
|
||||
|
||||
export interface SkipResponse {
|
||||
requested: number;
|
||||
skipped: number;
|
||||
not_skipped: number;
|
||||
}
|
||||
|
||||
@@ -36,3 +36,82 @@ export function etaPhrase(minutes: number): string {
|
||||
const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
|
||||
return `약 ${text}시간 후 소진 예상`;
|
||||
}
|
||||
|
||||
/** ETA 분 → 칩용 짧은 표기 ("약 4.6시간" / "약 12분") */
|
||||
export function etaShort(minutes: number): string {
|
||||
if (minutes < 60) return `약 ${Math.max(1, Math.round(minutes))}분`;
|
||||
const hours = minutes / 60;
|
||||
const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
|
||||
return `약 ${text}시간`;
|
||||
}
|
||||
|
||||
/** 경과 초 → "N분 전 / N시간 전 / N일 전" */
|
||||
export function formatAgeSec(sec: number): string {
|
||||
if (sec < 3600) return `${Math.max(1, Math.round(sec / 60))}분 전`;
|
||||
if (sec < 86400) return `${Math.round(sec / 3600)}시간 전`;
|
||||
return `${Math.round(sec / 86400)}일 전`;
|
||||
}
|
||||
|
||||
/* ─── 흐름 보드 정적 매핑 (plan ds-board-engines-1) ───────────────────────────
|
||||
* stage → 흐름 노드 / 엔진(모델) / 소속 머신. API 는 머신 label 과 단계 사실만
|
||||
* 주고(raw 모델명 노출 금지 계약), 엔진·모델 표기는 여기 단일 지점이 책임진다.
|
||||
* ★ 모델/엔진 교체 시 이 블록 1곳만 수정 (예: 맥미니 모델 스왑).
|
||||
*/
|
||||
|
||||
export type FlowMachine = 'gpu' | 'macmini' | 'macbook';
|
||||
|
||||
export interface FlowNodeDef {
|
||||
key: string;
|
||||
/** 노드 표시명 */
|
||||
label: string;
|
||||
/** 합산할 stage 키 (다중 = 같은 엔진 공유) */
|
||||
stages: string[];
|
||||
machine: FlowMachine;
|
||||
/** 엔진/모델 표시명 (FE 정적 — 모델 교체 시 여기 수정) */
|
||||
engine: string;
|
||||
/** 보조 표기 (서비스/워커명) */
|
||||
sub: string;
|
||||
}
|
||||
|
||||
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
|
||||
export const FLOW_NODES: FlowNodeDef[] = [
|
||||
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' },
|
||||
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' },
|
||||
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
|
||||
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
|
||||
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' },
|
||||
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
|
||||
];
|
||||
|
||||
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
|
||||
export const AUX_NODES: FlowNodeDef[] = [
|
||||
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' },
|
||||
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' },
|
||||
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' },
|
||||
];
|
||||
|
||||
/** 머신 스트립 메타 — 모델 표기 단일 지점 */
|
||||
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
|
||||
gpu: { label: 'GPU 서버', model: '특화 엔진' },
|
||||
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' },
|
||||
macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
|
||||
};
|
||||
|
||||
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
|
||||
export const FLOW_STAGE_LABEL: Record<string, string> = {
|
||||
extract: '추출',
|
||||
classify: '분류',
|
||||
summarize: '요약',
|
||||
embed: '임베딩',
|
||||
chunk: '청크',
|
||||
preview: '미리보기',
|
||||
stt: '전사',
|
||||
thumbnail: '썸네일',
|
||||
deep_summary: '심층분석',
|
||||
markdown: '마크다운',
|
||||
fulltext: '전문',
|
||||
};
|
||||
|
||||
export function flowStageLabel(stage: string): string {
|
||||
return FLOW_STAGE_LABEL[stage] ?? stage;
|
||||
}
|
||||
|
||||
@@ -14,9 +14,7 @@
|
||||
import { user } from '$lib/stores/auth';
|
||||
import { api } from '$lib/api';
|
||||
import { queueOverview, refreshQueueOverview } from '$lib/stores/queueOverview';
|
||||
import {
|
||||
MACHINE_STATE_LABEL, machineChipClass, machineDotClass, formatRate, etaPhrase,
|
||||
} from '$lib/utils/queueDisplay';
|
||||
import ProcessingFlowBoard from '$lib/components/ProcessingFlowBoard.svelte';
|
||||
import type { QueueOverview } from '$lib/types/queue';
|
||||
import EmptyState from '$lib/components/ui/EmptyState.svelte';
|
||||
import Skeleton from '$lib/components/ui/Skeleton.svelte';
|
||||
@@ -460,65 +458,9 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- ═══ 처리 머신 보드 (안2) + ETA 라인 (안5 라이트) ═══ -->
|
||||
<!-- ═══ 처리 머신 보드 v2 — 파이프라인 흐름 + 상세 패널 + 실패 드로어 (ds-board-engines-1) ═══ -->
|
||||
{#if queue}
|
||||
<div class="mt-5">
|
||||
<div class="text-[11px] font-bold text-dim uppercase tracking-wider mb-3">처리 머신</div>
|
||||
<div class="grid grid-cols-1 md:grid-cols-3 gap-3">
|
||||
{#each queue.machines as m (m.key)}
|
||||
<div class="bg-surface border border-default rounded-card p-4">
|
||||
<!-- 헤더: 상태 dot + 라벨 + state 칩 -->
|
||||
<div class="flex items-center justify-between gap-2 mb-2">
|
||||
<span class="flex items-center gap-2 text-[13px] font-bold text-text min-w-0">
|
||||
<span class="w-2 h-2 rounded-full shrink-0 {machineDotClass(m.state)}"></span>
|
||||
<span class="truncate">{m.label}</span>
|
||||
</span>
|
||||
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(m.state)}">
|
||||
{MACHINE_STATE_LABEL[m.state]}
|
||||
</span>
|
||||
</div>
|
||||
<!-- 담당 단계 칩 -->
|
||||
{#if m.stages.length > 0}
|
||||
<div class="flex flex-wrap gap-1 mb-2.5">
|
||||
{#each m.stages as s (s)}
|
||||
<span class="text-[10px] font-semibold rounded-full px-2 py-0.5 bg-surface-hover text-dim">{queueStageLabel(s)}</span>
|
||||
{/each}
|
||||
</div>
|
||||
{/if}
|
||||
<!-- 대기 · 처리율 · 오늘 -->
|
||||
<div class="text-xs text-dim tabular-nums">
|
||||
대기 <strong class="text-text">{m.pending.toLocaleString()}</strong>
|
||||
· 처리율 <strong class="text-text">{formatRate(m.done_1h)}/h</strong>
|
||||
· 오늘 <strong class="text-text">{m.done_today.toLocaleString()}</strong>건
|
||||
</div>
|
||||
<!-- 맥북 보류 (sleep 등 자동 재개 대기) -->
|
||||
{#if m.key === 'macbook' && m.deferred_pending > 0}
|
||||
<div class="text-[11px] font-semibold text-warning mt-1.5 tabular-nums">보류 {m.deferred_pending.toLocaleString()}건 — 자동 재개 대기</div>
|
||||
{/if}
|
||||
<!-- 지금 처리 중인 문서 -->
|
||||
{#if m.current.length > 0}
|
||||
<div class="text-[11px] text-dim border-t border-dashed border-default mt-2.5 pt-2 truncate"
|
||||
title={m.current.map((c) => `${c.title} (${queueStageLabel(c.stage)})`).join(' · ')}>
|
||||
지금: {m.current[0].title} ({queueStageLabel(m.current[0].stage)}){m.current.length > 1 ? ` 외 ${m.current.length - 1}건` : ''}
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
|
||||
<!-- ETA 한 줄 (안5 라이트 — 추정치) -->
|
||||
<div class="text-xs text-dim mt-2.5 px-1 tabular-nums"
|
||||
title="현재 페이스 기반 추정치 — 유입 변동 시 달라질 수 있습니다">
|
||||
요약 대기 <strong class="text-text">{queue.summarize_eta.pending.toLocaleString()}건</strong>
|
||||
— 소화 {formatRate(queue.summarize_eta.done_rate_1h)}/h
|
||||
· 유입 {formatRate(queue.summarize_eta.inflow_rate_1h)}/h
|
||||
{#if queue.summarize_eta.eta_minutes != null}
|
||||
· <span class="text-accent font-semibold">{etaPhrase(queue.summarize_eta.eta_minutes)}</span>
|
||||
{:else}
|
||||
· 유입 우세(백필 중)
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
<ProcessingFlowBoard overview={queue} />
|
||||
{/if}
|
||||
|
||||
<!-- ═══ 단계 상세 (기존 stage 테이블 — 접힘 강등, 실패 있을 때 자동 펼침) ═══ -->
|
||||
|
||||
@@ -17,6 +17,12 @@
|
||||
macbook_unavailable / substrate_degraded / 기타 detail). 자동 fallback
|
||||
금지 — 다른 모드로 자동 전환하지 않는다. 스트림 도중 중단 = 받은 부분
|
||||
유지 + 표시.
|
||||
- 대기 표시(첫 바이트 전): 경과 타이머 1초 갱신 + 3초 후 GET /api/eid/status
|
||||
1회·이후 8초 간격 재조회(실패는 조용히 무시 — 기능 비차단)로 "대기"와
|
||||
"고장"을 정직하게 구분. daily.busy=true 면 줄 서는 중 안내. 15초 경과 +
|
||||
daily 모드면 [심층으로 전환]/[취소] 버튼 노출 — 전환은 명시 클릭만
|
||||
(자동 fallback 금지 정책 위반 아님). 첫 바이트 도착/스트림 종료 시
|
||||
타이머·폴링 즉시 정리.
|
||||
- 이력: localStorage `eid_chat:v1` (키 상수는 $lib/eidChat — logout 시 제거와 공유).
|
||||
전송 payload 는 마지막 20턴(40 messages) cap.
|
||||
- 입력 한도: 메시지당 8,000자 클라 선차단(서버 422 검증과 동일 한도).
|
||||
@@ -25,15 +31,25 @@
|
||||
-->
|
||||
<script lang="ts">
|
||||
import { onMount, onDestroy } from 'svelte';
|
||||
import { apiFetchRaw } from '$lib/api';
|
||||
import { api, apiFetchRaw } from '$lib/api';
|
||||
import { EID_CHAT_STORAGE_KEY } from '$lib/eidChat';
|
||||
import Button from '$lib/components/ui/Button.svelte';
|
||||
import EmptyState from '$lib/components/ui/EmptyState.svelte';
|
||||
import EidEvidenceCard from '$lib/components/eid/EidEvidenceCard.svelte';
|
||||
import { MessageCircle, SendHorizontal, RotateCcw, AlertCircle } from 'lucide-svelte';
|
||||
|
||||
type ChatMode = 'daily' | 'deep';
|
||||
type ChatMessage = { role: 'user' | 'assistant'; content: string };
|
||||
// deep(검색) 답변은 sources(근거)·partial 동반. daily 답변은 없음.
|
||||
type EidSource = { id?: number; doc_id?: number; title?: string; score?: number };
|
||||
type ChatMessage = {
|
||||
role: 'user' | 'assistant';
|
||||
content: string;
|
||||
sources?: EidSource[];
|
||||
partial?: boolean;
|
||||
};
|
||||
type Notice = { kind: 'warn' | 'error'; message: string; retryable: boolean };
|
||||
// GET /api/eid/status 응답 — 대기 중 바쁨 신호 조회에 필요한 필드만 좁게 정의
|
||||
type EidStatus = { daily?: { busy?: boolean } };
|
||||
|
||||
// 이력 키 — logout(stores/auth.ts) 의 이력 제거와 단일 상수 공유
|
||||
const STORAGE_KEY = EID_CHAT_STORAGE_KEY;
|
||||
@@ -45,6 +61,10 @@
|
||||
const MAX_MESSAGE_CHARS = 8000;
|
||||
// 한도 근접 카운터 노출 시작점
|
||||
const COUNTER_THRESHOLD = 7500;
|
||||
// 대기 표시(첫 바이트 전): 상태 폴링 시작 시점(초) / 재조회 간격(초) / 행동 버튼 노출 시점(초)
|
||||
const STATUS_POLL_START_SEC = 3;
|
||||
const STATUS_POLL_INTERVAL_SEC = 8;
|
||||
const WAIT_ACTIONS_SEC = 15;
|
||||
|
||||
const DEEP_CAPTION =
|
||||
'장문·무거운 질문에 적합 — 잠들어 있으면 자동 기동 (처음 응답까지 최대 ~1분)';
|
||||
@@ -64,11 +84,72 @@
|
||||
let streaming = $state(false);
|
||||
let streamingText = $state('');
|
||||
let notice = $state<Notice | null>(null);
|
||||
// deep(검색) 모드 첫 바이트 전 단계 — 'searching' 이면 대기 표시를 "근거 검색 중"으로
|
||||
let deepPhase = $state<'searching' | null>(null);
|
||||
|
||||
let scrollEl: HTMLDivElement | undefined = $state();
|
||||
let textareaEl: HTMLTextAreaElement | undefined = $state();
|
||||
let abortCtrl: AbortController | null = null;
|
||||
|
||||
// ── 대기 추적 (첫 바이트 전) ────────────────────────
|
||||
// 경과 초 + daily 엔진 바쁨 여부(null = 미확인). 토큰(세대 카운터)으로
|
||||
// 스트림별 소유를 구분 — abort 직후 즉시 재전송(심층 전환) 경로에서
|
||||
// 이전 스트림의 늦은 정리가 새 스트림의 타이머를 죽이지 않게 한다.
|
||||
let waitSeconds = $state(0);
|
||||
let dailyBusy = $state<boolean | null>(null);
|
||||
let waitIntervalId: ReturnType<typeof setInterval> | null = null;
|
||||
let waitTokenSeq = 0;
|
||||
let waitToken = 0; // 현재 활성 추적 토큰 (0 = 추적 없음)
|
||||
|
||||
function startWaitTracking(streamMode: ChatMode): number {
|
||||
// 이전 추적 잔여 정리 (전환 재전송처럼 stop 전에 start 가 오는 경로 방어)
|
||||
if (waitIntervalId !== null) {
|
||||
clearInterval(waitIntervalId);
|
||||
waitIntervalId = null;
|
||||
}
|
||||
const token = ++waitTokenSeq;
|
||||
waitToken = token;
|
||||
waitSeconds = 0;
|
||||
dailyBusy = null;
|
||||
waitIntervalId = setInterval(() => {
|
||||
if (waitToken !== token) return; // 정리 누락 방어 — 무해 no-op
|
||||
waitSeconds += 1;
|
||||
// 바쁨 신호 폴링: 3초 경과 시 1회 + 이후 8초 간격 (3, 11, 19, ...).
|
||||
// daily 모드 전용 — deep 대기는 기존 wake 안내 + 경과 타이머만.
|
||||
if (
|
||||
streamMode === 'daily' &&
|
||||
waitSeconds >= STATUS_POLL_START_SEC &&
|
||||
(waitSeconds - STATUS_POLL_START_SEC) % STATUS_POLL_INTERVAL_SEC === 0
|
||||
) {
|
||||
void pollEidStatus(token);
|
||||
}
|
||||
}, 1000);
|
||||
return token;
|
||||
}
|
||||
|
||||
// token 가드: 본인 소유 추적만 정리 — 다른 스트림이 이어받았으면 no-op
|
||||
function stopWaitTracking(token: number) {
|
||||
if (token !== waitToken) return;
|
||||
waitToken = 0;
|
||||
if (waitIntervalId !== null) {
|
||||
clearInterval(waitIntervalId);
|
||||
waitIntervalId = null;
|
||||
}
|
||||
waitSeconds = 0;
|
||||
dailyBusy = null;
|
||||
}
|
||||
|
||||
// 상태 조회 — 실패는 조용히 무시 (대기 표시는 타이머만으로 유지, 기능 비차단)
|
||||
async function pollEidStatus(token: number) {
|
||||
try {
|
||||
const status = await api<EidStatus>('/eid/status');
|
||||
if (token !== waitToken) return; // 스트림 종료/교체 후 도착한 늦은 응답 폐기
|
||||
dailyBusy = status?.daily?.busy === true;
|
||||
} catch {
|
||||
// 무시 — 바쁨 신호는 부가 정보일 뿐 채팅 기능을 차단하지 않는다
|
||||
}
|
||||
}
|
||||
|
||||
// ── localStorage 이력 ───────────────────────────────
|
||||
function persist() {
|
||||
if (typeof window === 'undefined') return;
|
||||
@@ -97,9 +178,15 @@
|
||||
typeof (m as ChatMessage).content === 'string'
|
||||
)
|
||||
// 배열 크기 가드 + content 8,000자 clamp — 외부에서 손상/비대해진
|
||||
// 이력이 전송 payload 를 오염시키지 않도록 복원 시점에 정규화
|
||||
// 이력이 전송 payload 를 오염시키지 않도록 복원 시점에 정규화.
|
||||
// sources/partial(deep 답변 근거)은 보존 — 전송 payload 엔 안 실림(runStream map 이 role/content 만).
|
||||
.slice(-MAX_STORED_MESSAGES)
|
||||
.map((m) => ({ role: m.role, content: m.content.slice(0, MAX_MESSAGE_CHARS) }));
|
||||
.map((m) => ({
|
||||
role: m.role,
|
||||
content: m.content.slice(0, MAX_MESSAGE_CHARS),
|
||||
sources: Array.isArray((m as ChatMessage).sources) ? (m as ChatMessage).sources : undefined,
|
||||
partial: (m as ChatMessage).partial === true || undefined,
|
||||
}));
|
||||
}
|
||||
} catch {
|
||||
// 손상된 이력은 무시 (새 대화로 시작)
|
||||
@@ -107,7 +194,11 @@
|
||||
}
|
||||
|
||||
onMount(() => restore());
|
||||
onDestroy(() => abortCtrl?.abort());
|
||||
onDestroy(() => {
|
||||
abortCtrl?.abort();
|
||||
// 페이지 이탈 시 대기 타이머/폴링 정리 (abort 의 finally 와 이중이어도 무해)
|
||||
if (waitIntervalId !== null) clearInterval(waitIntervalId);
|
||||
});
|
||||
|
||||
// ── 자동 스크롤 (새 메시지 / 스트림 청크마다 하단 고정) ──
|
||||
$effect(() => {
|
||||
@@ -235,12 +326,39 @@
|
||||
void runStream();
|
||||
}
|
||||
|
||||
// ── 대기 중 행동 버튼 (daily + 15초 경과) ────────────
|
||||
// [심층으로 전환] — 명시 클릭에 의한 모드 전환 (자동 fallback 금지 정책
|
||||
// 위반 아님). 현재 fetch abort → 같은 user 턴을 mode=deep 으로 즉시 재전송.
|
||||
// abort 된 이전 스트림의 finally 는 abortCtrl 비교 + 대기 token 가드로
|
||||
// 새 스트림 상태를 건드리지 않는다 (새 대화 abort race 가드와 동일 구조).
|
||||
function switchToDeep() {
|
||||
if (!streaming || mode !== 'daily') return;
|
||||
mode = 'deep'; // 모드 토글 상태도 deep 으로 갱신
|
||||
abortCtrl?.abort();
|
||||
void runStream();
|
||||
}
|
||||
|
||||
// [취소] — abort 후 방금 push 한 user 턴 pop + 입력창 본문 복원
|
||||
// (422 처리와 동일 패턴: 이력 오염 차단 + localStorage 재저장).
|
||||
// placeholder 제거는 abort 된 스트림의 finally(streaming=false)가 처리.
|
||||
function cancelWait() {
|
||||
if (!streaming) return;
|
||||
abortCtrl?.abort();
|
||||
if (messages.length > 0 && messages[messages.length - 1].role === 'user') {
|
||||
const popped = messages.pop();
|
||||
if (popped && !input) input = popped.content;
|
||||
persist();
|
||||
}
|
||||
}
|
||||
|
||||
async function runStream() {
|
||||
notice = null;
|
||||
streaming = true;
|
||||
streamingText = '';
|
||||
const ctrl = new AbortController();
|
||||
abortCtrl = ctrl;
|
||||
// 첫 바이트 전 대기 추적 시작 — 본 스트림 소유 토큰으로 정리 시점 제어
|
||||
const waitTok = startWaitTracking(mode);
|
||||
|
||||
const payload = {
|
||||
mode,
|
||||
@@ -251,6 +369,9 @@
|
||||
|
||||
let acc = '';
|
||||
let sawDone = false;
|
||||
// deep(검색) 답변 동반 데이터 — daily 는 안 옴
|
||||
let accSources: EidSource[] = [];
|
||||
let accPartial = false;
|
||||
|
||||
try {
|
||||
const res = await apiFetchRaw('/eid/chat', {
|
||||
@@ -301,9 +422,35 @@
|
||||
try {
|
||||
const obj = JSON.parse(data) as {
|
||||
choices?: Array<{ delta?: { content?: unknown } }>;
|
||||
phase?: string;
|
||||
error_reason?: string;
|
||||
eid_sources?: EidSource[];
|
||||
partial?: boolean;
|
||||
};
|
||||
// deep(검색) envelope 분기 — daily 응답엔 없음
|
||||
if (obj?.phase === 'ping') return false; // heartbeat — 무시
|
||||
if (obj?.phase === 'searching') {
|
||||
deepPhase = 'searching'; // 대기 표시를 "근거 검색 중"으로
|
||||
return false;
|
||||
}
|
||||
if (obj?.phase === 'error') {
|
||||
// in-stream 미가용/실패 — 받은 부분 유지 + 명시 표시 (자동 fallback 0).
|
||||
// 뒤따르는 [DONE] 이 sawDone 처리하므로 '중단' 오경보 없음.
|
||||
notice = mapErrorReason(obj.error_reason, '');
|
||||
return false;
|
||||
}
|
||||
if (Array.isArray(obj?.eid_sources)) {
|
||||
accSources = obj.eid_sources;
|
||||
accPartial = obj.partial === true;
|
||||
return false;
|
||||
}
|
||||
const piece = obj?.choices?.[0]?.delta?.content;
|
||||
if (typeof piece === 'string' && piece) {
|
||||
// 첫 바이트 도착 — 대기 타이머/폴링 제거, 기존 스트리밍 표시로 전환
|
||||
if (!acc) {
|
||||
stopWaitTracking(waitTok);
|
||||
deepPhase = null;
|
||||
}
|
||||
acc += piece;
|
||||
streamingText = acc;
|
||||
}
|
||||
@@ -356,7 +503,7 @@
|
||||
}
|
||||
} catch (err) {
|
||||
if ((err as Error)?.name === 'AbortError') {
|
||||
// 새 대화 등 사용자 의도 중단 — 안내 불필요
|
||||
// 새 대화 / 대기 취소 / 심층 전환 등 사용자 의도 중단 — 안내 불필요
|
||||
return;
|
||||
}
|
||||
// 스트림 도중 네트워크 에러 — 받은 부분 유지 + 표시
|
||||
@@ -368,14 +515,23 @@
|
||||
}
|
||||
: { kind: 'error', message: '요청에 실패했습니다 — 네트워크를 확인하세요.', retryable: true };
|
||||
} finally {
|
||||
// 스트림 종료 — 대기 타이머/폴링 정리. 첫 바이트에서 이미 정리됐거나
|
||||
// 전환 재전송으로 새 스트림이 추적을 이어받았으면 token 가드로 no-op.
|
||||
stopWaitTracking(waitTok);
|
||||
// abort(새 대화/페이지 이탈) 시에는 push 하지 않음 — 새 대화로 비운
|
||||
// messages 에 이전 스트림 잔여분이 흘러들어가는 race 방지.
|
||||
if (acc && !ctrl.signal.aborted) {
|
||||
messages.push({ role: 'assistant', content: acc });
|
||||
messages.push({
|
||||
role: 'assistant',
|
||||
content: acc,
|
||||
sources: accSources.length ? accSources : undefined,
|
||||
partial: accPartial || undefined,
|
||||
});
|
||||
}
|
||||
if (abortCtrl === ctrl) {
|
||||
streaming = false;
|
||||
streamingText = '';
|
||||
deepPhase = null;
|
||||
abortCtrl = null;
|
||||
}
|
||||
persist();
|
||||
@@ -398,6 +554,24 @@
|
||||
// 입력 길이(전송 기준 = trim 후) — 7,500자부터 카운터 노출, 8,000자 초과 차단
|
||||
let inputLength = $derived(input.trim().length);
|
||||
let overLimit = $derived(inputLength > MAX_MESSAGE_CHARS);
|
||||
|
||||
// 첫 바이트 전 placeholder 문구 — "대기"와 "고장"의 정직한 구분:
|
||||
// 바쁨 확인 = 줄 서는 중 / 비-바쁨 확인 = 생성 준비 중 / 미확인 = 응답 대기 중.
|
||||
// deep 모드는 폴링하지 않으므로 항상 미확인(타이머만) — wake 안내는 헤더 caption.
|
||||
let waitPlaceholder = $derived(
|
||||
deepPhase === 'searching'
|
||||
? `이드가 문서·뉴스에서 근거를 찾는 중 · ${waitSeconds}초`
|
||||
: dailyBusy === true
|
||||
? `엔진이 다른 작업을 처리하고 있어요 — 차례가 오면 바로 시작됩니다 (대기 ${waitSeconds}초)`
|
||||
: dailyBusy === false
|
||||
? `응답 생성 준비 중 · ${waitSeconds}초`
|
||||
: `응답 대기 중 · ${waitSeconds}초`
|
||||
);
|
||||
|
||||
// 행동 버튼 노출: daily 모드 + 첫 바이트 전 + 15초 경과
|
||||
let showWaitActions = $derived(
|
||||
streaming && !streamingText && mode === 'daily' && waitSeconds >= WAIT_ACTIONS_SEC
|
||||
);
|
||||
</script>
|
||||
|
||||
<svelte:head>
|
||||
@@ -473,25 +647,35 @@
|
||||
</div>
|
||||
</div>
|
||||
{:else}
|
||||
<div class="flex justify-start">
|
||||
<div class="flex flex-col items-start">
|
||||
<div class="max-w-[85%] sm:max-w-[75%] px-3.5 py-2.5 rounded-lg rounded-bl-sm bg-surface border border-default text-text text-sm whitespace-pre-wrap break-words">
|
||||
{msg.content}
|
||||
</div>
|
||||
{#if msg.sources?.length}
|
||||
<EidEvidenceCard sources={msg.sources} partial={msg.partial ?? false} />
|
||||
{/if}
|
||||
</div>
|
||||
{/if}
|
||||
{/each}
|
||||
|
||||
<!-- 스트리밍 중 assistant 부분 응답 -->
|
||||
<!-- 스트리밍 중 assistant 부분 응답 / 첫 바이트 전 대기 표시 -->
|
||||
{#if streaming}
|
||||
<div class="flex justify-start">
|
||||
<div class="max-w-[85%] sm:max-w-[75%] px-3.5 py-2.5 rounded-lg rounded-bl-sm bg-surface border border-default text-text text-sm whitespace-pre-wrap break-words">
|
||||
{#if streamingText}
|
||||
{streamingText}<span class="inline-block w-1.5 h-3.5 ml-0.5 align-middle bg-accent animate-pulse rounded-sm"></span>
|
||||
{:else}
|
||||
<span class="text-dim animate-pulse">응답 준비 중...</span>
|
||||
<span class="text-dim animate-pulse">{waitPlaceholder}</span>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
<!-- 대기 행동 버튼: daily + 15초 경과 — 전환은 명시 클릭만 (자동 fallback 금지) -->
|
||||
{#if showWaitActions}
|
||||
<div class="flex justify-start gap-2">
|
||||
<Button variant="secondary" size="sm" onclick={switchToDeep}>심층으로 전환</Button>
|
||||
<Button variant="ghost" size="sm" onclick={cancelWait}>취소</Button>
|
||||
</div>
|
||||
{/if}
|
||||
{/if}
|
||||
|
||||
<!-- 에러/안내 카드: 자동 fallback 없이 명시 표시만 -->
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
|
||||
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
|
||||
CREATE TABLE IF NOT EXISTS documents_cand_qwen06 (
|
||||
doc_id BIGINT PRIMARY KEY,
|
||||
embed_input_hash TEXT,
|
||||
embedding vector(1024) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
@@ -0,0 +1,10 @@
|
||||
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
|
||||
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen06 (
|
||||
id BIGINT PRIMARY KEY,
|
||||
doc_id BIGINT NOT NULL,
|
||||
chunk_index INTEGER,
|
||||
section_title TEXT,
|
||||
text TEXT,
|
||||
embedding vector(1024) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
@@ -0,0 +1,8 @@
|
||||
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
|
||||
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
|
||||
CREATE TABLE IF NOT EXISTS documents_cand_qwen4 (
|
||||
doc_id BIGINT PRIMARY KEY,
|
||||
embed_input_hash TEXT,
|
||||
embedding vector(2560) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
@@ -0,0 +1,10 @@
|
||||
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
|
||||
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4 (
|
||||
id BIGINT PRIMARY KEY,
|
||||
doc_id BIGINT NOT NULL,
|
||||
chunk_index INTEGER,
|
||||
section_title TEXT,
|
||||
text TEXT,
|
||||
embedding vector(2560) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
@@ -0,0 +1,8 @@
|
||||
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
|
||||
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
|
||||
CREATE TABLE IF NOT EXISTS documents_cand_qwen4m (
|
||||
doc_id BIGINT PRIMARY KEY,
|
||||
embed_input_hash TEXT,
|
||||
embedding vector(1024) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
@@ -0,0 +1,10 @@
|
||||
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
|
||||
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4m (
|
||||
id BIGINT PRIMARY KEY,
|
||||
doc_id BIGINT NOT NULL,
|
||||
chunk_index INTEGER,
|
||||
section_title TEXT,
|
||||
text TEXT,
|
||||
embedding vector(1024) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
@@ -0,0 +1,152 @@
|
||||
"""POST /api/eid/chat mode=deep — ReAct 자동검색 SSE 변환 (ds-eid-ask-absorb P1).
|
||||
|
||||
★ DB·LLM 0: get_session/get_current_user dependency override, probe·agentic_ask_loop·
|
||||
get_backend monkeypatch. 실제 검색·27B 호출 없음.
|
||||
★ 검증: 검색성→phase:searching+content+eid_sources+DONE / probe 실패→503 /
|
||||
mid-stream BackendUnavailable→in-stream error envelope / 대화성→sources 빈.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from fastapi import FastAPI
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "app"))
|
||||
|
||||
import api.eid_chat as eid_chat # noqa: E402
|
||||
from api.eid_chat import router as eid_chat_router # noqa: E402
|
||||
from core.auth import get_current_user # noqa: E402
|
||||
from core.database import get_session # noqa: E402
|
||||
from services.llm.backends import BackendUnavailable # noqa: E402
|
||||
from services.search.react_loop import ReactResult # noqa: E402
|
||||
|
||||
_DEEP = {"mode": "deep", "messages": [{"role": "user", "content": "콜드박스 위험성평가 찾아줘"}]}
|
||||
|
||||
|
||||
async def _async_true() -> bool:
|
||||
return True
|
||||
|
||||
|
||||
async def _async_false() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _build_app() -> FastAPI:
|
||||
app = FastAPI()
|
||||
app.include_router(eid_chat_router, prefix="/api/eid")
|
||||
app.dependency_overrides[get_current_user] = lambda: types.SimpleNamespace(
|
||||
id=1, username="test-user"
|
||||
)
|
||||
|
||||
async def _fake_session():
|
||||
yield None # deep 경로는 session 을 agentic_ask_loop 에 넘기기만(여기선 monkeypatch)
|
||||
|
||||
app.dependency_overrides[get_session] = _fake_session
|
||||
return app
|
||||
|
||||
|
||||
def _data_objs(raw: bytes) -> list[dict]:
|
||||
out: list[dict] = []
|
||||
for line in raw.split(b"\n"):
|
||||
if line.startswith(b"data: ") and line[len(b"data: "):].strip() != b"[DONE]":
|
||||
try:
|
||||
out.append(json.loads(line[len(b"data: "):]))
|
||||
except Exception:
|
||||
pass
|
||||
return out
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def client():
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=_build_app()), base_url="http://test"
|
||||
) as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _rules_present(monkeypatch):
|
||||
# D-6 fail-closed 가드 통과 (substrate degraded 아님)
|
||||
monkeypatch.setattr(eid_chat.eid_compose, "rules_present", lambda: True)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deep_search_sse_shape(client, monkeypatch):
|
||||
"""검색성 질문 → phase:searching + final content + eid_sources + DONE 순서."""
|
||||
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
|
||||
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
|
||||
|
||||
async def _fake_loop(session, query, *, backend, **kw):
|
||||
return ReactResult(
|
||||
final_answer="콜드박스 위험성평가는 TK-RA-2026-OT1-01 입니다.",
|
||||
iterations=1,
|
||||
partial=False,
|
||||
sources=[{"id": 1, "doc_id": 10, "title": "OT1 콜드박스 위험성평가", "score": 0.91}],
|
||||
)
|
||||
|
||||
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _fake_loop)
|
||||
|
||||
r = await client.post("/api/eid/chat", json=_DEEP)
|
||||
assert r.status_code == 200
|
||||
objs = _data_objs(r.content)
|
||||
assert "searching" in [o.get("phase") for o in objs if "phase" in o]
|
||||
content = "".join(
|
||||
o["choices"][0]["delta"]["content"] for o in objs if "choices" in o
|
||||
)
|
||||
assert "OT1-01" in content
|
||||
srcs = [o["eid_sources"] for o in objs if "eid_sources" in o]
|
||||
assert srcs and srcs[0][0]["title"] == "OT1 콜드박스 위험성평가"
|
||||
assert b"data: [DONE]" in r.content
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deep_conversational_no_sources(client, monkeypatch):
|
||||
"""대화성(검색 불요) → ReAct early-exit, sources 빈 배열."""
|
||||
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
|
||||
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
|
||||
|
||||
async def _chat_loop(session, query, *, backend, **kw):
|
||||
return ReactResult(final_answer="안녕하세요, 이드입니다.", iterations=1, partial=False, sources=[])
|
||||
|
||||
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _chat_loop)
|
||||
|
||||
r = await client.post("/api/eid/chat", json=_DEEP)
|
||||
assert r.status_code == 200
|
||||
objs = _data_objs(r.content)
|
||||
srcs = [o["eid_sources"] for o in objs if "eid_sources" in o]
|
||||
assert srcs and srcs[0] == [] # 검색 안 함 = 근거 카드 안 뜸
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deep_probe_fail_503(client, monkeypatch):
|
||||
"""probe 실패(router 미도달) → 첫 바이트 전 503 router_unreachable."""
|
||||
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_false)
|
||||
r = await client.post("/api/eid/chat", json=_DEEP)
|
||||
assert r.status_code == 503
|
||||
assert r.json()["error_reason"] == "router_unreachable"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deep_midstream_error_envelope(client, monkeypatch):
|
||||
"""검색 중 BackendUnavailable(AC 분리 등) → in-stream error envelope + DONE."""
|
||||
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
|
||||
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
|
||||
|
||||
async def _fail_loop(session, query, *, backend, **kw):
|
||||
raise BackendUnavailable("qwen-macbook", "macbook_unavailable")
|
||||
|
||||
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _fail_loop)
|
||||
|
||||
r = await client.post("/api/eid/chat", json=_DEEP)
|
||||
assert r.status_code == 200 # 스트림 이미 시작(probe 통과) → 200 + in-stream error
|
||||
objs = _data_objs(r.content)
|
||||
errs = [o for o in objs if o.get("phase") == "error"]
|
||||
assert errs and errs[0]["error_reason"] == "macbook_unavailable"
|
||||
assert b"data: [DONE]" in r.content
|
||||
@@ -131,6 +131,8 @@ async def test_503_substrate_degraded(client, monkeypatch):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_503_backend_unavailable_prestream(client, monkeypatch):
|
||||
# call_stream 회귀(prestream 503)는 daily 로 검증 — deep 은 이제 ReAct 별 경로
|
||||
# (probe·agentic_ask_loop), deep 의 503/midstream 은 test_eid_chat_deep.py 가 커버.
|
||||
async def fake_call_stream(self, mode, messages, system):
|
||||
raise BackendUnavailable("qwen-macbook", "macbook_unavailable")
|
||||
yield b"" # pragma: no cover — async generator 형태 유지용
|
||||
@@ -138,7 +140,7 @@ async def test_503_backend_unavailable_prestream(client, monkeypatch):
|
||||
monkeypatch.setattr(EidAIClient, "call_stream", fake_call_stream)
|
||||
r = await client.post(
|
||||
"/api/eid/chat",
|
||||
json={"mode": "deep", "messages": [{"role": "user", "content": "x"}]},
|
||||
json={"mode": "daily", "messages": [{"role": "user", "content": "x"}]},
|
||||
)
|
||||
assert r.status_code == 503
|
||||
js = r.json()
|
||||
@@ -192,9 +194,11 @@ async def test_200_midstream_abort_quiet(client, monkeypatch):
|
||||
raise BackendUnavailable("qwen-macbook", "stream_deadline_exceeded")
|
||||
|
||||
monkeypatch.setattr(EidAIClient, "call_stream", fake_call_stream)
|
||||
# call_stream midstream 회귀는 daily 로 — deep midstream 은 in-stream error envelope
|
||||
# 경로(test_eid_chat_deep.test_deep_midstream_error_envelope)로 분리됨.
|
||||
r = await client.post(
|
||||
"/api/eid/chat",
|
||||
json={"mode": "deep", "messages": [{"role": "user", "content": "x"}]},
|
||||
json={"mode": "daily", "messages": [{"role": "user", "content": "x"}]},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.content == b'data: {"x": 1}\n\n'
|
||||
|
||||
@@ -104,7 +104,7 @@ async def test_anthropic_router_url_blocked(monkeypatch):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deep_mode_alias_and_sse_line_rewrite(monkeypatch):
|
||||
"""deep → qwen-macbook alias, system 은 messages[0] 단일 주입, 라인 단위 정화 중계."""
|
||||
"""deep → mac-mini-default alias (맥북 백지화 2026-06-11), system 은 messages[0] 단일 주입, 라인 단위 정화 중계."""
|
||||
seen: dict = {}
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
@@ -139,7 +139,7 @@ async def test_deep_mode_alias_and_sse_line_rewrite(monkeypatch):
|
||||
]
|
||||
assert seen["url"].endswith("/v1/chat/completions")
|
||||
body = seen["json"]
|
||||
assert body["model"] == "qwen-macbook"
|
||||
assert body["model"] == "mac-mini-default"
|
||||
assert body["stream"] is True
|
||||
assert body["max_tokens"] == 2048
|
||||
assert body["temperature"] == 0.4
|
||||
@@ -202,7 +202,7 @@ async def test_prestream_503_maps_reason(monkeypatch):
|
||||
with pytest.raises(BackendUnavailable) as ei:
|
||||
await anext(stream)
|
||||
assert ei.value.reason == "macbook_unavailable"
|
||||
assert ei.value.backend_name == "qwen-macbook"
|
||||
assert ei.value.backend_name == "mac-mini-default"
|
||||
finally:
|
||||
await c.close()
|
||||
|
||||
@@ -253,7 +253,7 @@ async def test_prestream_400_raises_valueerror_failloud(monkeypatch):
|
||||
c = EidAIClient()
|
||||
try:
|
||||
stream = c.call_stream("deep", _MSG, "sys")
|
||||
with pytest.raises(ValueError, match="router rejected alias='qwen-macbook'"):
|
||||
with pytest.raises(ValueError, match="router rejected alias='mac-mini-default'"):
|
||||
await anext(stream)
|
||||
finally:
|
||||
await c.close()
|
||||
@@ -290,7 +290,7 @@ async def test_stream_deadline_exceeded(monkeypatch):
|
||||
async for _ in stream:
|
||||
pass
|
||||
assert ei.value.reason == "stream_deadline_exceeded"
|
||||
assert ei.value.backend_name == "qwen-macbook"
|
||||
assert ei.value.backend_name == "mac-mini-default"
|
||||
finally:
|
||||
await c.close()
|
||||
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
"""GET /api/eid/status endpoint 테스트 — inline ASGI app (DB 의존 0).
|
||||
|
||||
★ 실행 환경: fastapi + httpx 필요 → test_eid_chat_endpoint.py 동일 idiom.
|
||||
★ DB 0 / LLM 0: get_current_user 는 dependency_overrides 로 대체, gate 점유는
|
||||
llm_gate.gate_status monkeypatch (eid_chat 이 모듈 attribute 로 호출하므로 유효).
|
||||
★ 무인증 케이스는 실제 auth 경로지만 decode 단계에서 거부돼 DB 접근 전 반환.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from fastapi import FastAPI
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "app"))
|
||||
|
||||
from api.eid_chat import router as eid_chat_router # noqa: E402
|
||||
from core.auth import get_current_user # noqa: E402
|
||||
from services.search import llm_gate # noqa: E402
|
||||
|
||||
|
||||
def _build_app(*, override_auth: bool = True) -> FastAPI:
|
||||
"""main.py 등록 방식과 동일 prefix(/api/eid)로 라우터만 올린 inline app."""
|
||||
app = FastAPI()
|
||||
app.include_router(eid_chat_router, prefix="/api/eid")
|
||||
if override_auth:
|
||||
app.dependency_overrides[get_current_user] = lambda: types.SimpleNamespace(
|
||||
id=1, username="test-user"
|
||||
)
|
||||
return app
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def client():
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=_build_app()), base_url="http://test"
|
||||
) as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
# ── 401 무인증 ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unauthenticated_rejected():
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=_build_app(override_auth=False)),
|
||||
base_url="http://test",
|
||||
) as ac:
|
||||
# 헤더 자체 부재 — HTTPBearer 단계 거부 (fastapi 기본 403, 버전별 401 허용)
|
||||
r = await ac.get("/api/eid/status")
|
||||
assert r.status_code in (401, 403)
|
||||
# 위조 토큰 — decode_token 실패 → 401 (DB 접근 전 거부)
|
||||
r2 = await ac.get(
|
||||
"/api/eid/status", headers={"Authorization": "Bearer bogus-token"}
|
||||
)
|
||||
assert r2.status_code == 401
|
||||
|
||||
|
||||
# ── 200 shape ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_200_shape(client, monkeypatch):
|
||||
"""응답 shape — daily 키 아래 busy/inflight/waiters 3필드, 타입 고정."""
|
||||
monkeypatch.setattr(
|
||||
llm_gate, "gate_status", lambda: {"inflight": False, "waiters": 0}
|
||||
)
|
||||
r = await client.get("/api/eid/status")
|
||||
assert r.status_code == 200, r.text
|
||||
js = r.json()
|
||||
assert set(js.keys()) == {"daily"}
|
||||
assert set(js["daily"].keys()) == {"busy", "inflight", "waiters"}
|
||||
assert isinstance(js["daily"]["busy"], bool)
|
||||
assert isinstance(js["daily"]["inflight"], bool)
|
||||
assert isinstance(js["daily"]["waiters"], int)
|
||||
|
||||
|
||||
# ── busy 판정 — gate_status monkeypatch ──────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"snap, expected",
|
||||
[
|
||||
# 유휴 — busy=false (근사: 외부 소비자 점유는 미포착)
|
||||
(
|
||||
{"inflight": False, "waiters": 0},
|
||||
{"busy": False, "inflight": False, "waiters": 0},
|
||||
),
|
||||
# inflight 만 — busy=true (확실)
|
||||
(
|
||||
{"inflight": True, "waiters": 0},
|
||||
{"busy": True, "inflight": True, "waiters": 0},
|
||||
),
|
||||
# waiters 만 — busy=true (inflight or waiters>0 의 or 분기)
|
||||
(
|
||||
{"inflight": False, "waiters": 3},
|
||||
{"busy": True, "inflight": False, "waiters": 3},
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_busy_from_gate_status(client, monkeypatch, snap, expected):
|
||||
monkeypatch.setattr(llm_gate, "gate_status", lambda: dict(snap))
|
||||
r = await client.get("/api/eid/status")
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.json() == {"daily": expected}
|
||||
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"_meta": {
|
||||
"plan": "embedding-phase2a-1 G-1",
|
||||
"measured_at": "2026-06-12",
|
||||
"serving": "Ollama 0.20.0 (GPU container `ollama`), endpoint = POST /api/embed (단일 고정 — legacy /api/embeddings 사용 금지)",
|
||||
"invariant": "저장=조회 동일 모델+버전, 프롬프트는 역할별 고정 (문서=plain / 쿼리=instruct prefix)"
|
||||
},
|
||||
"instruct_prefix_pinned": "Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: ",
|
||||
"models": {
|
||||
"qwen3-embedding:0.6b": {
|
||||
"digest": "ac6da0dfba84", "size": "639MB", "dim": 1024, "l2_normalized": true
|
||||
},
|
||||
"qwen3-embedding:4b": {
|
||||
"digest": "df5bd2e3c74c", "size": "2.5GB(Q4)", "dim": 2560, "l2_normalized": true,
|
||||
"mrl_dimensions_option": {"supported": true, "dimensions=1024": {"dim": 1024, "l2_norm": 1.0, "note": "Ollama 가 truncate+재정규화까지 수행 — 쿼리측 MRL 은 dimensions 옵션으로 처리"}}
|
||||
}
|
||||
},
|
||||
"asymmetric_prefix_effect_0.6b": {
|
||||
"doc": "압력용기의 수압시험은 설계압력의 1.3배로 실시하며, 시험 중 용접부 누설 여부를 육안으로 확인한다.",
|
||||
"query": "압력용기 수압시험 기준 압력은?",
|
||||
"cos_doc_vs_query_plain": 0.7446,
|
||||
"cos_doc_vs_query_instruct": 0.7606,
|
||||
"cos_plain_vs_instruct_query": 0.882,
|
||||
"verdict": "prefix 가 쿼리 임베딩을 실질 변화시키고(0.882) 관련쌍 유사도를 올림(+0.016) — 비대칭 사용 필수"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
"""Phase 2A E-4 비교기 — baseline vs 후보 run CSV 들의 per-query 판정.
|
||||
|
||||
python tests/search_eval/compare_runs.py \
|
||||
--baseline baselines/<exact 재측정>.csv \
|
||||
--cand qwen06=<...>.csv --cand qwen4=<...>.csv --cand qwen4m=<...>.csv \
|
||||
[--epsilon 0.01] [--bootstrap 2000]
|
||||
|
||||
판정 출력(plan r3 E-4): 전체 graded NDCG 평균 delta · per-query win/loss/tie(|d|<ε=tie)
|
||||
· 부트스트랩 95% CI · 카테고리별 평균 · 상위 개선/퇴행 쿼리. failure_expected/error 행 제외.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import random
|
||||
import statistics
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def load(path: str) -> dict[str, dict]:
|
||||
out = {}
|
||||
with Path(path).open(encoding="utf-8") as f:
|
||||
for row in csv.DictReader(f):
|
||||
if row.get("failure_expected", "").lower() in ("true", "1"):
|
||||
continue
|
||||
if row.get("error"):
|
||||
continue
|
||||
try:
|
||||
row["_g"] = float(row["graded_ndcg_at_10"])
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
out[row["id"]] = row
|
||||
return out
|
||||
|
||||
|
||||
def bootstrap_ci(deltas: list[float], n: int, seed: int = 42) -> tuple[float, float]:
|
||||
rng = random.Random(seed)
|
||||
means = sorted(
|
||||
statistics.mean(rng.choices(deltas, k=len(deltas))) for _ in range(n)
|
||||
)
|
||||
return means[int(0.025 * n)], means[int(0.975 * n)]
|
||||
|
||||
|
||||
def main() -> None:
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--baseline", required=True)
|
||||
p.add_argument("--cand", action="append", required=True, metavar="name=csv")
|
||||
p.add_argument("--epsilon", type=float, default=0.01)
|
||||
p.add_argument("--bootstrap", type=int, default=2000)
|
||||
a = p.parse_args()
|
||||
|
||||
base = load(a.baseline)
|
||||
print(f"baseline: {a.baseline} — scored {len(base)}, "
|
||||
f"graded NDCG mean {statistics.mean(r['_g'] for r in base.values()):.4f}")
|
||||
|
||||
for spec in a.cand:
|
||||
name, path = spec.split("=", 1)
|
||||
cand = load(path)
|
||||
ids = sorted(set(base) & set(cand))
|
||||
if len(ids) != len(base):
|
||||
print(f" ⚠ {name}: 교집합 {len(ids)} != baseline {len(base)} — 누락 쿼리 확인")
|
||||
deltas = [cand[i]["_g"] - base[i]["_g"] for i in ids]
|
||||
mean_b = statistics.mean(base[i]["_g"] for i in ids)
|
||||
mean_c = statistics.mean(cand[i]["_g"] for i in ids)
|
||||
win = sum(1 for d in deltas if d > a.epsilon)
|
||||
loss = sum(1 for d in deltas if d < -a.epsilon)
|
||||
tie = len(deltas) - win - loss
|
||||
lo, hi = bootstrap_ci(deltas, a.bootstrap)
|
||||
decided = win + loss
|
||||
win_rate = (win / decided * 100) if decided else 0.0
|
||||
|
||||
print(f"\n== {name} ==")
|
||||
print(f" graded NDCG: {mean_b:.4f} → {mean_c:.4f} (delta {mean_c-mean_b:+.4f}, "
|
||||
f"bootstrap95% [{lo:+.4f}, {hi:+.4f}])")
|
||||
print(f" per-query: win {win} / loss {loss} / tie {tie} (ε={a.epsilon}) — "
|
||||
f"win-rate(결정전) {win_rate:.0f}%")
|
||||
|
||||
cats: dict[str, list[float]] = {}
|
||||
for i in ids:
|
||||
cats.setdefault(base[i].get("category", "?"), []).append(
|
||||
cand[i]["_g"] - base[i]["_g"]
|
||||
)
|
||||
for c in sorted(cats):
|
||||
ds = cats[c]
|
||||
cb = statistics.mean(base[i]["_g"] for i in ids if base[i].get("category") == c)
|
||||
cc = statistics.mean(cand[i]["_g"] for i in ids if base[i].get("category") == c)
|
||||
print(f" {c:<18} {cb:.3f} → {cc:.3f} ({statistics.mean(ds):+.3f}, n={len(ds)})")
|
||||
|
||||
ranked = sorted(ids, key=lambda i: cand[i]["_g"] - base[i]["_g"])
|
||||
worst = [(i, round(cand[i]['_g']-base[i]['_g'],3)) for i in ranked[:3]]
|
||||
best = [(i, round(cand[i]['_g']-base[i]['_g'],3)) for i in ranked[-3:][::-1]]
|
||||
print(f" 개선 top3 {best} / 퇴행 top3 {worst}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,150 @@
|
||||
"""2026-06-12 fair-share 번들 — gate capacity 일반화 / call_deep_or_defer cfg / drain classify.
|
||||
|
||||
worker-process 레벨(DB 필요)의 deep 폴백·classify drain 은 라이브 E2E 로 검증하고,
|
||||
여기서는 새 메커니즘의 seam 만 단위 검증한다.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from core.config import settings
|
||||
from services.search.llm_gate import Priority, _reset_for_test, acquire_mlx_gate, gate_status
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_gate(monkeypatch):
|
||||
monkeypatch.setattr(settings, "mlx_gate_concurrency", 2)
|
||||
_reset_for_test()
|
||||
yield
|
||||
_reset_for_test()
|
||||
|
||||
|
||||
# ─── gate capacity 2 ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_two_concurrent_holders_overlap():
|
||||
"""capacity=2: 두 holder 가 동시에 inflight — 서로를 기다리지 않는다."""
|
||||
log: list = []
|
||||
|
||||
async def hold(label: str):
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
log.append(("in", label))
|
||||
await asyncio.sleep(0.05)
|
||||
log.append(("out", label))
|
||||
|
||||
await asyncio.gather(hold("a"), hold("b"))
|
||||
# 둘 다 진입한 뒤에 첫 release 가 나와야 함 (overlap 증명)
|
||||
assert log[0][0] == "in" and log[1][0] == "in"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_third_waits_until_slot_frees():
|
||||
"""capacity=2: 3번째는 대기, 첫 release 후 진입."""
|
||||
order: list = []
|
||||
release_a = asyncio.Event()
|
||||
|
||||
async def hold(label: str, wait_event: asyncio.Event | None):
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
order.append(("in", label))
|
||||
if wait_event:
|
||||
await wait_event.wait()
|
||||
else:
|
||||
await asyncio.sleep(0.01)
|
||||
order.append(("out", label))
|
||||
|
||||
t_a = asyncio.create_task(hold("a", release_a))
|
||||
t_b = asyncio.create_task(hold("b", release_a))
|
||||
await asyncio.sleep(0.02) # a, b inflight 진입 대기
|
||||
assert ("in", "a") in order and ("in", "b") in order
|
||||
assert gate_status()["inflight"] == 2
|
||||
|
||||
t_c = asyncio.create_task(hold("c", None))
|
||||
await asyncio.sleep(0.02)
|
||||
assert ("in", "c") not in order # 슬롯 2 점유 중 — c 는 대기
|
||||
assert gate_status()["waiters"] == 1
|
||||
|
||||
release_a.set()
|
||||
await asyncio.gather(t_a, t_b, t_c)
|
||||
assert ("in", "c") in order
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_capacity_one_serializes():
|
||||
"""concurrency=1 이면 기존 직렬화 그대로 (무회귀)."""
|
||||
from core.config import settings as s
|
||||
|
||||
s_backup = s.mlx_gate_concurrency
|
||||
s.mlx_gate_concurrency = 1
|
||||
try:
|
||||
_reset_for_test()
|
||||
log: list = []
|
||||
|
||||
async def hold(label: str):
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
log.append(("in", label))
|
||||
await asyncio.sleep(0.02)
|
||||
log.append(("out", label))
|
||||
|
||||
await asyncio.gather(hold("a"), hold("b"))
|
||||
# 직렬: in/out 쌍이 겹치지 않음
|
||||
assert [e[0] for e in log] == ["in", "out", "in", "out"]
|
||||
finally:
|
||||
s.mlx_gate_concurrency = s_backup
|
||||
|
||||
|
||||
# ─── call_deep_or_defer cfg override ─────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_deep_or_defer_cfg_override():
|
||||
"""cfg 지정 시 deep 슬롯 대신 해당 config 로 _request 호출."""
|
||||
from ai.client import call_deep_or_defer
|
||||
|
||||
seen: dict = {}
|
||||
|
||||
class FakeClient:
|
||||
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
|
||||
|
||||
async def _request(self, cfg, prompt, system=None):
|
||||
seen["cfg"] = cfg
|
||||
return "ok"
|
||||
|
||||
async def call_deep(self, prompt, system=None):
|
||||
seen["cfg"] = self.ai.deep
|
||||
return "ok"
|
||||
|
||||
override = SimpleNamespace(model="deep-endpoint-triage-sampling", temperature=0.0)
|
||||
out = await call_deep_or_defer(FakeClient(), "p", cfg=override)
|
||||
assert out == "ok"
|
||||
assert seen["cfg"] is override
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_deep_or_defer_cfg_still_defers():
|
||||
"""cfg 경로에서도 보류 분류(502/503/TransportError → StageDeferred) 동일 적용."""
|
||||
from ai.client import call_deep_or_defer
|
||||
from models.queue import StageDeferred
|
||||
|
||||
class FakeClient:
|
||||
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
|
||||
|
||||
async def _request(self, cfg, prompt, system=None):
|
||||
raise httpx.ConnectError("down")
|
||||
|
||||
with pytest.raises(StageDeferred):
|
||||
await call_deep_or_defer(FakeClient(), "p", cfg=SimpleNamespace(model="x"))
|
||||
|
||||
|
||||
# ─── drain stages ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_drain_stages_include_classify():
|
||||
from workers.queue_drain import DRAIN_STAGES
|
||||
|
||||
assert set(DRAIN_STAGES) == {"summarize", "deep_summary", "classify"}
|
||||
@@ -152,8 +152,9 @@ async def test_drain_requires_deep_slot(monkeypatch):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_rejects_non_drain_stage(monkeypatch):
|
||||
"""classify 는 2026-06-12 fair-share 로 DRAIN_STAGES 합류 — 거부 대상은 extract 등."""
|
||||
import workers.queue_drain as qd
|
||||
|
||||
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=object())))
|
||||
with pytest.raises(SystemExit):
|
||||
await qd.drain("classify", 1)
|
||||
await qd.drain("extract", 1)
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
"""Phase 2A (embedding-phase2a-1) — Qwen 후보 디스패처/쿼리 임베딩 단위 테스트."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from services.search import retrieval_service as rs
|
||||
|
||||
|
||||
def test_resolve_qwen_backends():
|
||||
for slug in ("cand_qwen06", "cand_qwen4", "cand_qwen4m"):
|
||||
cfg = rs._resolve_backend(slug)
|
||||
assert cfg["docs_table"].startswith("documents_cand_qwen")
|
||||
assert cfg["chunks_table"].startswith("document_chunks_cand_qwen")
|
||||
assert cfg["embed_kind"] == "ollama"
|
||||
# 테이블명이 2단계 SQL allowlist 도 통과해야 함 (R2-B1)
|
||||
assert rs._VALID_DOCS_TABLE.match(cfg["docs_table"])
|
||||
assert rs._VALID_CHUNKS_TABLE.match(cfg["chunks_table"])
|
||||
assert rs._resolve_backend("baseline") is None
|
||||
with pytest.raises(ValueError):
|
||||
rs._resolve_backend("cand_unknown")
|
||||
|
||||
|
||||
def test_qwen4m_has_mrl_dimensions():
|
||||
assert rs._resolve_backend("cand_qwen4m")["embed_dimensions"] == 1024
|
||||
assert "embed_dimensions" not in rs._resolve_backend("cand_qwen4")
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
def __init__(self, embs):
|
||||
self._embs = embs
|
||||
|
||||
def raise_for_status(self):
|
||||
return None
|
||||
|
||||
def json(self):
|
||||
return {"embeddings": self._embs}
|
||||
|
||||
|
||||
class _FakeClient:
|
||||
"""httpx.AsyncClient 대역 — post body 캡처."""
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def __init__(self, *a, **k):
|
||||
pass
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *a):
|
||||
return False
|
||||
|
||||
async def post(self, url, json=None):
|
||||
_FakeClient.captured = {"url": url, "json": json}
|
||||
dim = (json or {}).get("dimensions") or 1024
|
||||
return _FakeResp([[0.1] * dim])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ollama_query_embed_applies_instruct_prefix(monkeypatch):
|
||||
import httpx
|
||||
|
||||
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
|
||||
cfg = rs._resolve_backend("cand_qwen06")
|
||||
out = await rs._embed_query_via_ollama(cfg, "압력용기 수압시험")
|
||||
assert out is not None and len(out) == 1024
|
||||
body = _FakeClient.captured["json"]
|
||||
assert body["model"] == "qwen3-embedding:0.6b"
|
||||
assert body["input"][0].startswith(rs.QWEN3_QUERY_INSTRUCT)
|
||||
assert body["input"][0].endswith("압력용기 수압시험")
|
||||
assert "dimensions" not in body
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ollama_query_embed_mrl_dimensions(monkeypatch):
|
||||
import httpx
|
||||
|
||||
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
|
||||
cfg = rs._resolve_backend("cand_qwen4m")
|
||||
out = await rs._embed_query_via_ollama(cfg, "q")
|
||||
assert _FakeClient.captured["json"]["dimensions"] == 1024
|
||||
assert len(out) == 1024
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ollama_query_embed_failure_returns_none(monkeypatch):
|
||||
import httpx
|
||||
|
||||
class _Boom(_FakeClient):
|
||||
async def post(self, url, json=None):
|
||||
raise httpx.ConnectError("down")
|
||||
|
||||
monkeypatch.setattr(httpx, "AsyncClient", _Boom)
|
||||
cfg = rs._resolve_backend("cand_qwen06")
|
||||
assert await rs._embed_query_via_ollama(cfg, "q") is None
|
||||
@@ -0,0 +1,202 @@
|
||||
"""생성 LLM 홀드 (pipeline.held_stages) — 컨슈머/워커 게이트 동작 테스트.
|
||||
|
||||
홀드 시멘틱: held 스테이지는 claim 자체를 하지 않는다 (attempts 미소모, DB 무접촉).
|
||||
비-held 스테이지는 기존과 동일하게 처리된다.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from core.config import Settings, settings
|
||||
from workers import digest_worker, queue_consumer
|
||||
|
||||
|
||||
def _fake_consumer_env(monkeypatch, held):
|
||||
processed = []
|
||||
|
||||
async def fake_process(stage, worker):
|
||||
processed.append(stage)
|
||||
|
||||
async def fake_reset(stages, threshold):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(queue_consumer, "_process_stage", fake_process)
|
||||
monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset)
|
||||
monkeypatch.setattr(
|
||||
queue_consumer, "_load_workers",
|
||||
lambda: {
|
||||
s: object()
|
||||
for s in (queue_consumer.MAIN_QUEUE_STAGES
|
||||
+ queue_consumer.FAST_QUEUE_STAGES + ["markdown"])
|
||||
},
|
||||
)
|
||||
monkeypatch.setattr(queue_consumer, "_hold_logged", False)
|
||||
monkeypatch.setattr(settings, "pipeline_held_stages", held)
|
||||
return processed
|
||||
|
||||
|
||||
def test_settings_default_empty():
|
||||
"""미설정 시 빈 리스트 = 무동작 (기존 동작 무회귀)."""
|
||||
assert Settings().pipeline_held_stages == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_consume_queue_skips_held_stages(monkeypatch):
|
||||
processed = _fake_consumer_env(
|
||||
monkeypatch, ["classify", "summarize", "deep_summary"]
|
||||
)
|
||||
|
||||
await queue_consumer.consume_queue()
|
||||
|
||||
assert "classify" not in processed
|
||||
assert "summarize" not in processed
|
||||
assert "deep_summary" not in processed
|
||||
# 특화 스테이지는 계속 처리 (embed/chunk 는 2026-06-12 fast 컨슈머로 분리)
|
||||
for stage in ("extract", "stt", "fulltext"):
|
||||
assert stage in processed
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_consume_queue_empty_hold_processes_all(monkeypatch):
|
||||
processed = _fake_consumer_env(monkeypatch, [])
|
||||
|
||||
await queue_consumer.consume_queue()
|
||||
|
||||
assert processed == list(queue_consumer.MAIN_QUEUE_STAGES)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fast_consumer_processes_embed_chunk_only(monkeypatch):
|
||||
"""fast 컨슈머(2026-06-12 분리) = embed/chunk 전용, LLM 사이클과 디커플."""
|
||||
processed = _fake_consumer_env(monkeypatch, [])
|
||||
|
||||
await queue_consumer.consume_fast_queue()
|
||||
|
||||
assert processed == ["embed", "chunk"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fast_consumer_respects_hold(monkeypatch):
|
||||
processed = _fake_consumer_env(monkeypatch, ["embed"])
|
||||
|
||||
await queue_consumer.consume_fast_queue()
|
||||
|
||||
assert processed == ["chunk"]
|
||||
|
||||
|
||||
def test_fast_split_invariants():
|
||||
"""세 컨슈머 stage 집합 disjoint + embed/chunk 배치 상향 회귀 가드."""
|
||||
main = set(queue_consumer.MAIN_QUEUE_STAGES)
|
||||
fast = set(queue_consumer.FAST_QUEUE_STAGES)
|
||||
md = set(queue_consumer.MARKDOWN_QUEUE_STAGES)
|
||||
assert not (main & fast) and not (main & md) and not (fast & md)
|
||||
assert fast == {"embed", "chunk"}
|
||||
assert queue_consumer.BATCH_SIZE["embed"] >= 10
|
||||
assert queue_consumer.BATCH_SIZE["chunk"] >= 10
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_markdown_consumer_not_held(monkeypatch):
|
||||
"""markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""
|
||||
processed = _fake_consumer_env(
|
||||
monkeypatch, ["classify", "summarize", "deep_summary", "digest"]
|
||||
)
|
||||
|
||||
await queue_consumer.consume_markdown_queue()
|
||||
|
||||
assert processed == ["markdown"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_digest_worker_held_returns_before_pipeline(monkeypatch):
|
||||
called = {"pipeline": False}
|
||||
|
||||
async def fake_pipeline():
|
||||
called["pipeline"] = True
|
||||
return {}
|
||||
|
||||
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
|
||||
monkeypatch.setattr(settings, "pipeline_held_stages", ["digest"])
|
||||
|
||||
await digest_worker.run()
|
||||
|
||||
assert called["pipeline"] is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_digest_worker_unheld_runs_pipeline(monkeypatch):
|
||||
called = {"pipeline": False}
|
||||
|
||||
async def fake_pipeline():
|
||||
called["pipeline"] = True
|
||||
return {"clusters": 0}
|
||||
|
||||
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
|
||||
monkeypatch.setattr(settings, "pipeline_held_stages", [])
|
||||
|
||||
await digest_worker.run()
|
||||
|
||||
assert called["pipeline"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_briefing_worker_held_returns_before_pipeline(monkeypatch):
|
||||
from workers import briefing_worker
|
||||
|
||||
called = {"pipeline": False}
|
||||
|
||||
async def fake_pipeline(target_date):
|
||||
called["pipeline"] = True
|
||||
return {}
|
||||
|
||||
monkeypatch.setattr(briefing_worker, "run_briefing_pipeline", fake_pipeline)
|
||||
monkeypatch.setattr(settings, "pipeline_held_stages", ["briefing"])
|
||||
|
||||
assert await briefing_worker.run() is None
|
||||
assert called["pipeline"] is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_study_explanation_consumer_held(monkeypatch):
|
||||
from workers import study_queue_consumer
|
||||
|
||||
touched = []
|
||||
|
||||
async def fake_reset():
|
||||
touched.append("reset")
|
||||
|
||||
monkeypatch.setattr(study_queue_consumer, "reset_stale_study_jobs", fake_reset)
|
||||
monkeypatch.setattr(settings, "pipeline_held_stages", ["study_explanation"])
|
||||
|
||||
await study_queue_consumer.consume_study_queue()
|
||||
|
||||
assert touched == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_study_consumers_held_no_db_touch(monkeypatch):
|
||||
"""held 시 stale reset 포함 DB 접근 0 — claim 미발생 실증."""
|
||||
from workers import study_memo_card_jobs_consumer, study_session_queue_consumer
|
||||
|
||||
touched = []
|
||||
|
||||
async def fake_reset_session():
|
||||
touched.append("session_reset")
|
||||
|
||||
async def fake_reset_card():
|
||||
touched.append("card_reset")
|
||||
|
||||
monkeypatch.setattr(
|
||||
study_session_queue_consumer, "reset_stale_session_jobs", fake_reset_session
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
study_memo_card_jobs_consumer, "reset_stale_card_jobs", fake_reset_card
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
settings, "pipeline_held_stages",
|
||||
["study_session_analysis", "study_memo_card"],
|
||||
)
|
||||
|
||||
await study_session_queue_consumer.consume_study_session_queue()
|
||||
await study_memo_card_jobs_consumer.consume_study_memo_card_queue()
|
||||
|
||||
assert touched == []
|
||||
@@ -20,8 +20,14 @@ from services.search.llm_gate import (
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_gate():
|
||||
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다)."""
|
||||
def _reset_gate(monkeypatch):
|
||||
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).
|
||||
|
||||
2026-06-12 capacity 일반화 이후 본 파일의 직렬화 가정 보존을 위해
|
||||
concurrency=1 로 고정 (capacity>1 동작은 test_fair_share.py 가 커버).
|
||||
"""
|
||||
from core.config import settings
|
||||
monkeypatch.setattr(settings, "mlx_gate_concurrency", 1)
|
||||
_reset_for_test()
|
||||
yield
|
||||
_reset_for_test()
|
||||
|
||||
@@ -379,5 +379,14 @@ def test_build_stages_order_fields_and_age():
|
||||
assert by["extract"]["failed"] == 2
|
||||
assert by["extract"]["oldest_pending_age_sec"] is None
|
||||
# 전 stage 행 존재 (빈 단계 숨김은 FE 몫)
|
||||
assert {"stage", "pending", "processing", "failed", "done_today",
|
||||
"oldest_pending_age_sec"} == set(rows[0].keys())
|
||||
assert {"stage", "pending", "processing", "failed", "done_1h", "created_1h",
|
||||
"done_today", "oldest_pending_age_sec"} == set(rows[0].keys())
|
||||
|
||||
|
||||
def test_build_stages_exposes_rates():
|
||||
"""ds-board-engines-1: done_1h/created_1h 노출 — 흐름 노드 처리율·ETA·유입 우세 재료."""
|
||||
from services.queue_overview import build_stages
|
||||
stats = {"embed": _stage(pending=4, done_1h=600, created_1h=120, done_today=900)}
|
||||
rows = build_stages(stats)
|
||||
embed = next(r for r in rows if r["stage"] == "embed")
|
||||
assert (embed["done_1h"], embed["created_1h"], embed["done_today"]) == (600, 120, 900)
|
||||
|
||||
Reference in New Issue
Block a user