Compare commits

...

18 Commits

Author SHA1 Message Date
hyungi 5581d3f1ce feat(board): 처리 보드 v2 — 파이프라인 흐름 뷰·엔진 구분·실패 재시도/건너뛰기 (ds-board-engines-1)
- 흐름 뷰 메인: 좌→우 노드(머신·엔진 태그, 유입 우세 amber, 실패 뱃지) + 머신 스트립(모델 표기) + trend_24h 스파크라인 첫 렌더
- 노드 클릭 상세 패널: KV 4칸 + 다중 stage 행 + 지금 처리 중
- 실패 처리 드로어: 에러 패턴 그룹 + 재시도/건너뛰기 (영구 실패의 첫 사용자 조치 경로)
- API: stages[].done_1h/created_1h 노출 + GET /api/queue/failed + POST /api/queue/retry|/skip (uq_queue_active 충돌 skip, 건너뛰기는 enqueue_next_stage 미호출)
- 엔진/모델 표기 = queueDisplay.ts 정적 맵 단일 지점 (모델 교체 시 1곳)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 01:05:04 +00:00
hyungi 8ac1dbf4a8 test(eval): Phase 2A E-4 비교기 — per-query win/loss/tie(ε)·부트스트랩 CI·카테고리 분해
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 08:34:18 +09:00
hyungi c3d237766d feat(search): Phase 2A E-1 — Qwen 후보 3종 백필 CLI + eval 디스패처 확장 (마이그 328~333)
- 후보 섀도 테이블 6종(전부 vector 타입 — eval=exact scan 이라 인덱스 불요, halfvec 은 C-1 소관)
- workers/phase2a_cand_backfill: resumable(NOT EXISTS)·배치 커밋·동결셋 한정(--doc/chunk-id-max),
  문서/청크 입력 = production 경로 동일 구성 + plain
- CANDIDATE_BACKEND_MAP += cand_qwen06/qwen4/qwen4m (embed_kind=ollama, 쿼리측 instruct prefix
  G-1 핀 문자열, qwen4m = dimensions 1024 MRL)
- qwen4m 적재는 qwen4 에서 SQL 파생(subvector+l2_normalize) — 본 CLI 비대상

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 08:29:53 +09:00
hyungi 5bc68c95f6 test(eval): Phase 2A G-1 — Qwen3-Embedding 서빙 fixture 박제 (Ollama 0.20.0, /api/embed)
0.6b=1024d/4b=2560d 정규화 출력, MRL dimensions 옵션 지원(재정규화 포함),
비대칭 instruct prefix 효과 실측(+0.016), instruct 문자열 핀.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 08:14:24 +09:00
hyungi 5dca5b5d28 ops(pipeline): embed/chunk 고속 컨슈머 분리 + 배치 1→10 — LLM 사이클 인질 해소
진단(2026-06-12 용량 평가): 단일 루프에서 classify(~190s×3)가 사이클을 점유,
건당 <1s 인 embed/chunk 가 사이클당 1건 캡 → 실효 ~580/일 vs 수요 최대 2,700/일,
적체 3,570 + 신규 문서 벡터 미적재(RAG 검색 누락). 4070 가동률 0% = 순수 구조 캡.
수리 = markdown 분리(05-01) 선례: consume_fast_queue 1분 잡 + 배치 10(GPU 공유 보수값,
캡 ~14,400/일). 세 컨슈머 stage 집합 disjoint(stale reset 이중 복구 방지). retrieval
로직·임베딩 모델 무접촉.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 07:50:07 +09:00
hyungi 9c9ff6eeba test(drain): classify 합류 반영 — 거부 케이스를 extract 로 교체
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 07:22:47 +09:00
hyungi d667545185 fix(classify): 적대 리뷰 반영 — use_deep 스레딩(B1)·StageDeferred 전파(B2)·legacy 호출 deep 경유(M3)
- _run_tier_triage(use_deep) 스레딩 — 미배선 NameError(전 classify 파괴) fix
- process 의 triage try 에 except StageDeferred: raise 선행 (drain 보류 시멘틱 복구)
- legacy classify()/summarize() 에 cfg 파라미터 — use_deep 시 deep 슬롯 경유 +
  is_deferrable_error → StageDeferred 변환(첫 호출 = 최저비용 지점에서 보류, doc 쓰기 0)
- ai_model_version = 실제 처리 경로 모델 (drain=qwen-macbook 귀속)
- analyze_event model_name 스레딩 + deep triage cfg 에 top_p 동승

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 07:12:40 +09:00
hyungi 235bbf9881 ops(pipeline): fair-share 번들 — drain classify 합류 + deep 맥미니 폴백 + mlx 게이트 동시 2
사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버:
- queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling,
  완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지)
- deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 —
  강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지
- llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) —
  'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제
- analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 06:56:02 +09:00
hyungi 30200a4e49 ops(ai): deep 슬롯 재도입 — 맥북 야간 night-drain 레버 (Qwen3.6-27B-6bit)
사용자 지시: 자기 전 night-drain 한 번 실행 → 맥북이 밤새 summarize/deep_summary 분담.
보류 시멘틱(StageDeferred)·drain CLI·라우터 wake preflight = 기존 검증 자산 재사용.
맥북 측 = RunAtLoad=false 수동 기동 + 서버 수명 한정 caffeinate + idle-watch 자동 종료.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 21:49:12 +09:00
hyungi eff2c3b7d3 ops(search): Qwen 27B 속도 반영 — synthesis 30s→120s, classifier 슬롯 모델 동승 교체
- config classifier 모델 gemma 잔존 = mlx 서버 Gemma 재로드(이중 적재) 위험 → Qwen 6bit 로 동승 교체
- synthesis 는 timeout 시 graceful skip 이 없는 답변 본체라 단독 상향 (classifier/query_analyzer/
  rewriter 의 30s/15s 캡은 초과 시 skip·원쿼리 폴백으로 degrade — 관찰 후 별도 튜닝)
- ask.backend.timeout_read_s 30→120 align

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 17:31:26 +09:00
hyungi 3d79002dfa ops(ai): Qwen 27B 프리필 실측(~112 tok/s) 반영 timeout 상향 — triage 480 / primary 900
장문(context_char_limit 상한급) 프리필이 수 분 걸려 기존 120/300s 로는 timeout 실패 churn.
단일 코루틴 컨슈머라 장문 1건이 사이클을 수 분 점유하는 것은 수용(관찰 후 배칭/컨텍스트 튜닝 PR).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 17:29:45 +09:00
hyungi 3d60008965 ops(ai)!: 맥미니 생성 모델 Qwen3.6-27B-6bit 전환 + 생성 LLM 홀드 해제
B안(사용자 2026-06-11): Gemma 26B-A4B → Qwen3.6-27B-6bit 풀교체.
- config.yaml triage/primary model 교체 + dense 감속 반영 timeout 상향(30→120/180→300)
- held_stages [] (홀드 해제 — 적체 자연 드레인, deep_summary 는 primary 복귀)
- eid deep 모드 = mac-mini-default 재지정(맥북 백지화). llm_gate '예외 없이 gate' invariant 에
  따라 deep 도 alias 조건으로 자동 게이트 (구 무게이트 = 맥북 별 endpoint 예외였음)
- deep probe 실패 reason = router_unreachable 로 정정 + 테스트 동기화
잔여(별 PR): ask 표면 qwen-macbook 옵션/백엔드 클래스/처리보드 맥북 카드 정리

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 17:19:35 +09:00
hyungi cd0040925a ops(pipeline): 생성 LLM 홀드 게이트 held_stages — 맥미니 모델 확정까지 보류
맥북 LLM 백지화 + 맥미니 모델 재결정에 따라 DS 의 생성 LLM 소비를 일괄 보류.
held = classify/summarize/deep_summary(큐, claim 미발생·attempts 미소모) +
digest(04:00)/briefing(05:10) cron + study explanation/session_analysis/memo_card 컨슈머.
GPU 특화 스테이지·수집기·인터랙티브(ask/eid chat)는 무영향. 기본값 [] = 무동작.
/api/digest/regenerate 는 홀드 중 409 명시. 해제 = config held_stages 비우고 fastapi 재기동.
exec plan: ~/.claude/plans/ds-llm-hold-exec-20260611.md

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 16:52:46 +09:00
hyungi fdac449a48 Merge pull request 'Feat/eid chat' (#35) from feat/eid-chat into main
Reviewed-on: #35
2026-06-11 15:14:43 +09:00
hyungi 40f5b5fe9e Merge pull request 'Feat/ds processing board' (#33) from feat/ds-processing-board into main
Reviewed-on: #33
2026-06-11 15:14:24 +09:00
hyungi 250896cdfa feat(eid): deep 모드 = ReAct 자동검색 + 근거 카드 (ds-eid-ask-absorb P1)
- deep 분기 _eid_chat_deep: 비생성 probe → phase:searching → agentic_ask_loop
  (tool_choice=auto 가 검색 여부 자율 판단, 검색 불요는 early-exit 대화) → final_answer
  + eid_sources envelope → DONE. heartbeat {phase:ping}(~10s, 프록시 idle timeout 차단)
  · mid-stream BackendUnavailable → in-stream error envelope · disconnect 시 task.cancel()
  + await(고아화·27B 점유 방지).
- daily = call_stream 무변경(맥미니 대화). deep = 맥북 27B ReAct (tool calling 27B 전용,
  맥미니 26B token-leak 미검증). 멀티턴 = 메시지 단독 처리(agentic_ask_loop query: str,
  history 2단계 백로그).
- EidEvidenceCard.svelte 접이식 근거 카드(sources 순서번호·제목·점수) + 프론트 SSE 파서
  확장(ping/searching/error/eid_sources) + 검색 중 표시 + 이력 보존.
- 테스트: deep 4건(검색성/대화성/probe-503/mid-stream-error) + 기존 call_stream 회귀 daily
  로 이전 = 29 passed.
- 동반(이전 eid-chat 세션 미커밋): /api/eid/status endpoint + llm_gate.gate_status +
  test_eid_status (채팅 대기 UI 의 '대기 vs 고장' 구분용, 5 passed).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:51:00 +09:00
hyungi 5e8b998a11 feat(documents): hier analyze 서브커맨드 — 재분해와 독립한 절분석 self-heal (g3-t3 갭)
re-decompose 의 char_start 완료마커는 'jump-target char_start 보유'라 컨테이너 recreate/deadline 으로
analyze 가 잘린 doc(char_start 있으나 일부 leaf 미분석)을 재선별 못 함 → rail summary 영구 미수렴 갭.
`analyze` 가 LEAF_SQL(미분석 leaf 보유) 기준 독립 선별로 수렴(멱등, --doc 제한 가능, jump 무관).
sweep 로그도 `analyze` 커맨드 안내로 갱신. (2026-06-10 백필서 recreate 로 잘린 5 doc·53 leaf 수동 처리한 케이스 항구화.)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-10 06:32:10 +09:00
hyungi 53999b2825 fix(documents): g-measure junk 검출 all-caps 과탐 제거 + verdict=coarse 스크린 명시
전부-대문자 휴리스틱이 기술문서 정상 heading(GENERAL REQUIREMENTS/WELDING) 130건 과탐 →
windowed/clean doc 거짓 A_better 강등. 회사-접미사(INC./LLC…)만, cover 영역(앞 4노드)+미stored 게이트.
verdict 는 coarse 스크린(감사용)이고 실집행 결정 = 결정적 partition + 적대 워크플로임을 docstring 박제.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 12:58:36 +09:00
47 changed files with 2581 additions and 169 deletions
+22 -8
View File
@@ -150,15 +150,26 @@ def is_deferrable_error(exc: Exception) -> bool:
return isinstance(exc, httpx.TransportError)
async def call_deep_or_defer(client: "AIClient", prompt: str, system: str | None = None) -> str:
async def call_deep_or_defer(
client: "AIClient",
prompt: str,
system: str | None = None,
cfg: "AIModelConfig | None" = None,
) -> str:
"""call_deep + 보류 변환 — 맥북 불가(503/연결/절단)는 StageDeferred 로 raise.
deep_summary_worker / summarize_worker(drain) 가 공유. StageDeferred 는 queue_consumer/
queue_drain 이 attempts 미소모 + deferred_until 백오프로 처리한다 (sleep-안전 불변식).
deep_summary_worker / summarize_worker(drain) / classify_worker(drain) 가 공유.
StageDeferred 는 queue_consumer/queue_drain 이 attempts 미소모 + deferred_until
백오프로 처리한다 (sleep-안전 불변식).
cfg: 지정 시 deep 슬롯 대신 이 config 로 호출 (classify drain — deep 슬롯의
endpoint 는 쓰되 triage 의 temperature/max_tokens 를 적용한 변형).
"""
from models.queue import StageDeferred
try:
if cfg is not None:
return await client._request(cfg, prompt, system=system)
return await client.call_deep(prompt, system=system)
except Exception as exc:
if is_deferrable_error(exc):
@@ -231,20 +242,23 @@ class AIClient:
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str) -> dict:
async def classify(self, text: str, cfg=None) -> dict:
"""[DEPRECATED] 기존 classify_worker 전용. B-1 에서 summary_triage 로 대체.
호출부 정리 전 존속. 신규 코드는 call_triage + prompt_render 를 쓸 것.
cfg (2026-06-12 fair-share): 지정 시 primary 대신 해당 config 로 호출 —
drain classify 가 deep 슬롯(맥북) 경유에 사용. cfg != ai.primary 라
_call_chat 의 primary→fallback 자동 전환은 발동하지 않는다 (에러 raw 전파).
"""
prompt = CLASSIFY_PROMPT.replace("{document_text}", text)
response = await self._call_chat(self.ai.primary, prompt)
response = await self._call_chat(cfg or self.ai.primary, prompt)
return response
async def summarize(self, text: str, force_premium: bool = False) -> str:
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체."""
async def summarize(self, text: str, force_premium: bool = False, cfg=None) -> str:
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체. cfg = classify() 와 동일."""
if force_premium:
return await self._call_chat(self.ai.premium, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
return await self._call_chat(self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
return await self._call_chat(cfg or self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
async def embed(self, text: str) -> list[float]:
"""벡터 임베딩 — GPU 서버 전용"""
+8
View File
@@ -244,7 +244,15 @@ async def regenerate(
user: Annotated[User, Depends(require_admin)],
):
"""수동 트리거 — 백그라운드 태스크로 워커 실행 (admin 필요)."""
from core.config import settings
from workers.digest_worker import run
# 홀드 중 silent no-op 방지 — 워커 게이트와 동일 조건을 표면에서 명시.
if "digest" in settings.pipeline_held_stages:
raise HTTPException(
status_code=409,
detail="global_digest 보류 중 (config.yaml pipeline.held_stages) — 해제 후 재시도",
)
asyncio.create_task(run())
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}
+160 -6
View File
@@ -2,8 +2,9 @@
확정 결정:
- D-1 경로 = /api/eid/chat (main.py prefix=/api/eid + 본 라우터 POST /chat)
- D-2 mode 닫힌 어휘: daily(mac-mini-default) / deep(qwen-macbook). 클라는 mode 만 보냄 —
claude-cloud / auto 금지 (Literal 로 422 차단). 심층(deep) 모드 무게이트.
- D-2 mode 닫힌 어휘: daily / deep — 둘 다 mac-mini-default (맥북 백지화 2026-06-11,
맥미니 Qwen 27B 단일 호스트. deep = ReAct 자동검색 모드 구분). 클라는 mode 만 보냄 —
claude-cloud / auto 금지 (Literal 로 422 차단). 게이트 = alias 기준 자동 적용(무게이트 폐지).
- D-3 독립 /chat 라우트 (frontend) — 본 모듈은 백엔드 API 만.
- D-5 LLM 호출 = EidAIClient.call_stream 한 곳 (이드 egress 봉쇄 불변식 #5,
RouterBackend 직접 호출 금지).
@@ -18,24 +19,58 @@ backend 실패는 /api/search/ask 와 동일 shape 의 503 + error_reason 매핑
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
from typing import Annotated, Literal
import httpx
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field, field_validator, model_validator
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from core.utils import setup_logger
from eid import compose as eid_compose
from eid.ai import EidAIClient
from models.user import User
from services.llm.backends import BackendUnavailable
from services.llm.backends import BackendUnavailable, _router_url, get_backend
from services.search import llm_gate
from services.search.react_loop import agentic_ask_loop
logger = setup_logger("eid_chat")
router = APIRouter()
# ── ds-eid-ask-absorb P1: deep 모드 = ReAct 자동검색 (맥미니 Qwen 27B, 2026-06-11~) ──
# 비생성 reachability probe — router 도달만 확인(coarse). 27B(맥북) 자체 미가용은
# 첫 generate_with_tools 호출의 BackendUnavailable → mid-stream error envelope 로 커버
# (plan: probe 정밀도 불필요, TOCTOU 는 in-stream error 가 처리). ~2s 타임아웃·생성 슬롯 비점유.
_DEEP_PROBE_TIMEOUT = httpx.Timeout(connect=2.0, read=2.0, write=2.0, pool=2.0)
# heartbeat: ReAct 다회 tool call 시 수십초 무출력 → 프록시 idle timeout 차단.
# `{"phase":"ping"}` no-op 이벤트 (프론트 envelope 파서가 자연 스킵 — `: ping` comment 는
# POST SSE fetch 파서가 처리 보장 안 됨).
_HEARTBEAT_INTERVAL_S = 10.0
async def _probe_router_reachable() -> bool:
"""router(:8890) /v1/models GET — 도달 확인(비생성). 실패/비200 = 미가용."""
url = f"{_router_url().rstrip('/')}/v1/models"
try:
async with httpx.AsyncClient(timeout=_DEEP_PROBE_TIMEOUT) as client:
resp = await client.get(url)
return resp.status_code == 200
except Exception:
return False
def _sse(obj: dict) -> bytes:
"""SSE 이벤트 1건 — data: <json>\\n\\n. final_answer 는 OpenAI 호환 choices.delta.content
로, sources/phase 는 별 envelope 키로(프론트가 분기). model/usage 머신 메타 미포함."""
return b"data: " + json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n\n"
class ChatMessage(BaseModel):
"""채팅 턴 1건. role=system 은 Literal 밖 → 422 (system 합본은 서버 compose 만 주입)."""
@@ -71,16 +106,130 @@ class ChatRequest(BaseModel):
return self
@router.get("/status")
async def eid_status(
user: Annotated[User, Depends(get_current_user)],
):
"""이드 backend 점유 상태 스냅샷 — GET /api/eid/status (UI 의 "대기 vs 고장" 구분용).
daily(맥미니 MLX) 의 DS 프로세스 내부 llm_gate 점유만 본다 — 외부 소비자
(맥미니 자체 derived-worker·Hermes 등)의 endpoint 점유는 미포착.
따라서 busy=true 는 확실(지금 줄이 있다), false 는 근사(외부 점유 가능성 잔존).
가벼움 보장: DB 0 / LLM 0 / 본문 로깅 0 — 폴링 대상으로 안전.
자동 fallback 판단 근거로 쓰지 않는다 (모드 전환 = 명시 버튼만, 정책).
"""
snap = llm_gate.gate_status()
inflight = bool(snap["inflight"])
waiters = int(snap["waiters"])
return {
"daily": {
"busy": inflight or waiters > 0,
"inflight": inflight,
"waiters": waiters,
}
}
def _backend_unavailable_response(body: ChatRequest, reason: str, backend_name: str) -> JSONResponse:
"""스트림 시작 전 27B 미가용 → ask 컨벤션과 동일 shape 503 (자동 fallback 0)."""
logger.warning(
"eid_chat backend_unavailable mode=%s turns=%d status=503 reason=%s",
body.mode, len(body.messages), reason,
)
return JSONResponse(
status_code=503,
content={
"error": "backend_unavailable",
"error_reason": reason,
"backend_requested": backend_name,
"detail": (
"심층 엔진(검색)이 일시적으로 응답할 수 없습니다. "
"잠시 후 다시 시도하거나 일상 모드로 물어보세요."
),
},
)
async def _eid_chat_deep(body: ChatRequest, session: AsyncSession) -> StreamingResponse | JSONResponse:
"""deep 모드 = ReAct 자동검색. ReAct(`tool_choice=auto`)가 검색 여부를 LLM 자율 판단 —
검색 불요 질문은 early-exit 으로 대화 답변. substrate(persona+rules+react_ask task)는
agentic_ask_loop 내부 compose("react_ask") 가 주입(evidence-first 자동 상속).
멀티턴 = 1단계는 마지막 user 메시지 단독 처리(agentic_ask_loop 가 query: str — history
미지원). 후속 질문 대명사 해소는 2단계 백로그.
"""
# ① 첫 SSE 바이트(=HTTP 200 확정) 전 비생성 probe — router 도달 실패 시 503 (재매핑 가능 구간)
if not await _probe_router_reachable():
return _backend_unavailable_response(body, "router_unreachable", "mac-mini-default")
query = body.messages[-1].content # 메시지 단독 처리 (마지막 user 턴)
backend = get_backend("mac-mini-default")
async def _stream() -> AsyncIterator[bytes]:
# ② phase:searching 방출 = HTTP 200 확정. 이후 미가용은 503 불가 → in-stream error.
yield _sse({"phase": "searching"})
task = asyncio.create_task(agentic_ask_loop(session, query, backend=backend))
try:
# heartbeat: task 미완 동안 ~10s 마다 ping (shield 로 wait_for 취소가 task 안 죽임)
while not task.done():
try:
await asyncio.wait_for(asyncio.shield(task), timeout=_HEARTBEAT_INTERVAL_S)
except asyncio.TimeoutError:
yield _sse({"phase": "ping"})
result = task.result() # BackendUnavailable 은 여기서 raise (mid-stream)
# final_answer = OpenAI 호환 1청크(프론트 기존 content 누적 경로 재사용)
yield _sse({"choices": [{"delta": {"content": result.final_answer}}]})
# 근거 = 별 envelope (citation 번호 없음 — 프론트가 순서 기반). partial = 근거 부족 표식
yield _sse({"eid_sources": result.sources, "partial": result.partial})
yield b"data: [DONE]\n\n"
logger.info(
"eid_chat deep ok turns=%d sources=%d partial=%s iters=%d",
len(body.messages), len(result.sources), result.partial, result.iterations,
)
except BackendUnavailable as exc:
# mid-stream 미가용(검색 중 AC 분리·뚜껑 닫힘) — 200 이미 송신, in-stream error envelope.
# error 뒤 [DONE] = 프론트 sawDone 로 '중단' 오경보 방지(명시 error notice 유지).
logger.warning(
"eid_chat deep mid-stream unavailable turns=%d reason=%s",
len(body.messages), exc.reason,
)
yield _sse({"phase": "error", "error_reason": exc.reason})
yield b"data: [DONE]\n\n"
except asyncio.CancelledError:
raise # 클라 disconnect — finally 가 task 정리
except Exception:
logger.exception("eid_chat deep stream failed turns=%d", len(body.messages))
yield _sse({"phase": "error", "error_reason": "deep_failed"})
yield b"data: [DONE]\n\n"
finally:
# 클라 disconnect 시 ReAct task 고아화 방지 — cancel + await(전파 완료 보장).
# 안 하면 27B 가 닫힌 연결 위해 수분 점유, router 동시성상 다음 검색 대기.
if not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
return StreamingResponse(
_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-store", "X-Accel-Buffering": "no"},
)
@router.post("/chat")
async def eid_chat(
body: ChatRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""이드 채팅 — router SSE 스트리밍 pass-through.
"""이드 채팅 — daily = router SSE pass-through(대화) / deep = ReAct 자동검색(근거).
503 경로 (둘 다 자동 fallback 없음):
503 경로 (모두 자동 fallback 없음):
- substrate_degraded: rules.md 부재 (D-6 fail-closed, 채팅 진행 거부)
- backend_unavailable: 스트림 시작 전 backend 실패 (ask 컨벤션과 동일 shape)
- backend_unavailable: 스트림 시작 전 backend 실패 (daily/deep 공통, ask 컨벤션 shape)
"""
# D-6: rules 부재 = fail-closed. 채팅은 안전·정책 가드 없이 진행하지 않는다(배너 X).
if not eid_compose.rules_present():
@@ -99,6 +248,11 @@ async def eid_chat(
},
)
# deep = ReAct 자동검색 (별 흐름 — probe + 동기 ReAct → SSE 변환)
if body.mode == "deep":
return await _eid_chat_deep(body, session)
# daily = 순수 대화 SSE pass-through (기존)
system = eid_compose.compose("eid_chat", task="")
client = EidAIClient()
stream = client.call_stream(
+94 -7
View File
@@ -1,20 +1,30 @@
"""처리 머신 보드 API — GET /api/queue/overview (plan ds-processing-ui-6an).
"""처리 머신 보드 API — /api/queue/* (plan ds-processing-ui-6an → ds-board-engines-1).
홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계 로직은
services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와 계약 고정.
응답에 raw 모델명 노출 금지 — 머신 label 만.
- GET /overview: 홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계
로직은 services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와
계약 고정. 응답에 raw 모델명 노출 금지 — 머신 label 만 (엔진/모델 표기는
FE 정적 맵 책임).
- GET /failed + POST /retry|/skip: 실패 처리 (ds-board-engines-1) — 영구 실패
(자동 재시도 3회 소진)의 유일한 사용자 조치 경로. 일괄 조치는 FE 가 그룹의
id 목록을 모아 보낸다 (서버측 패턴 매칭 없음 — raw 식별자/패턴 미수신).
"""
from datetime import datetime
from typing import Annotated, Literal
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from models.user import User
from services.queue_overview import build_overview
from services.queue_overview import (
build_overview,
fetch_failed_items,
retry_failed,
skip_failed,
)
router = APIRouter()
@@ -64,11 +74,17 @@ class Totals(BaseModel):
class StageRow(BaseModel):
"""단계별 현황 행 — '단계 상세' 패널용 (완료 가시화)."""
"""단계별 현황 행 — 흐름 노드/상세 패널용.
done_1h/created_1h = 처리율·유입률 (유입 우세 판정 + ETA 의 FE 재료,
ds-board-engines-1 추가 — 수집 SQL 에 이미 있던 값의 노출).
"""
stage: str
pending: int
processing: int
failed: int
done_1h: int
created_1h: int
done_today: int
oldest_pending_age_sec: int | None
@@ -81,6 +97,40 @@ class QueueOverviewResponse(BaseModel):
totals: Totals
class FailedItem(BaseModel):
"""영구 실패 행 — 실패 드로어 표시 단위."""
id: int
stage: str
document_id: int
title: str
attempts: int
max_attempts: int
error_message: str | None
failed_at: datetime | None
class FailedListResponse(BaseModel):
items: list[FailedItem]
total: int
class QueueActionRequest(BaseModel):
"""재시도/건너뛰기 대상 — 실패 행 id 목록 (FE 가 그룹핑 후 전달)."""
ids: list[int] = Field(min_length=1, max_length=300)
class RetryResponse(BaseModel):
requested: int
retried: int
not_retried: int
class SkipResponse(BaseModel):
requested: int
skipped: int
not_skipped: int
@router.get("/overview", response_model=QueueOverviewResponse)
async def get_queue_overview(
user: Annotated[User, Depends(get_current_user)],
@@ -88,3 +138,40 @@ async def get_queue_overview(
):
"""머신 관점 처리 보드 + summarize ETA 집계 (라이브 계산, 신규 테이블 0)"""
return QueueOverviewResponse.model_validate(await build_overview(session))
@router.get("/failed", response_model=FailedListResponse)
async def get_failed_items(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)"""
items = await fetch_failed_items(session)
return FailedListResponse(
items=[FailedItem.model_validate(i) for i in items],
total=len(items),
)
@router.post("/retry", response_model=RetryResponse)
async def retry_failed_items(
body: QueueActionRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""실패 행 재시도 — attempts 리셋 + pending 복귀.
not_retried = 같은 (문서, 단계) 의 active 행 충돌(uq_queue_active) 또는
이미 failed 가 아닌 행 (중복 클릭 등) — 건드리지 않고 건수만 보고.
"""
return RetryResponse.model_validate(await retry_failed(session, body.ids))
@router.post("/skip", response_model=SkipResponse)
async def skip_failed_items(
body: QueueActionRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""실패 행 건너뛰기 — completed 마킹(payload.skipped_by_user) + 연쇄 없음"""
return SkipResponse.model_validate(await skip_failed(session, body.ids))
+28
View File
@@ -158,6 +158,17 @@ class Settings(BaseModel):
# 업로드 한도 (authoritative policy)
upload: UploadConfig = UploadConfig()
# 생성 LLM 홀드 (2026-06-11): config.yaml pipeline.held_stages 에 든 이름의
# 컨슈머/워커는 claim 자체를 하지 않는다 (attempts 미소모, pending 적체 = 의도).
# 유효 키 = 큐 stage 명(classify/summarize/deep_summary) + cron/컨슈머 키(digest,
# briefing, study_explanation, study_session_analysis, study_memo_card).
# 빈 리스트 = 무동작 (기존 동작 그대로).
pipeline_held_stages: list[str] = []
# mlx gate 동시 실행 상한 (2026-06-12, config.yaml pipeline.mlx_gate_concurrency).
# 1 = 구 single-inference 동작. 2 = continuous batching 활용 (llm_gate docstring 참조).
mlx_gate_concurrency: int = 1
# PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini
# GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거.
study_explanation_enabled: bool = True
@@ -244,6 +255,21 @@ def load_settings() -> Settings:
)
)
pipeline_held_stages: list[str] = []
mlx_gate_concurrency = 1
if config_path.exists() and raw and "pipeline" in raw:
held_raw = (raw.get("pipeline") or {}).get("held_stages") or []
# 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용.
if not isinstance(held_raw, (list, tuple)):
held_raw = [held_raw]
pipeline_held_stages = [str(s) for s in held_raw]
try:
mlx_gate_concurrency = max(
1, int((raw.get("pipeline") or {}).get("mlx_gate_concurrency", 1))
)
except (TypeError, ValueError):
mlx_gate_concurrency = 1
taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {}
document_types = raw.get("document_types", []) if config_path.exists() and raw else []
upload_cfg = (
@@ -272,6 +298,8 @@ def load_settings() -> Settings:
study_explanation_enabled=study_explanation_enabled,
study_card_extract_enabled=study_card_extract_enabled,
internal_worker_token=internal_worker_token,
pipeline_held_stages=pipeline_held_stages,
mlx_gate_concurrency=mlx_gate_concurrency,
)
+10 -7
View File
@@ -29,16 +29,19 @@ import httpx
from ai.client import AIClient
from services.llm.backends import (
MAC_MINI_DEFAULT,
QWEN_MACBOOK,
BackendUnavailable,
_router_url, # router URL 단일 출처 재사용 (settings → env LLM_ROUTER_URL → MVP default)
)
from services.search.llm_gate import Priority, acquire_mlx_gate
# 이드 채팅 mode → router alias 닫힌 매핑 (D-2). 클라는 mode 만 보냄 — claude-cloud/auto 금지.
# 2026-06-11 맥북 백지화: deep 도 mac-mini-default (맥미니 Qwen 27B 단일 호스트).
# mode 구분은 유지 — deep = ReAct 자동검색 경로(모델이 아니라 동작이 다름).
# 게이트는 alias==MAC_MINI_DEFAULT 조건이라 deep 도 자동으로 mlx gate 적용
# (llm_gate "예외 없이 gate 획득 필수" invariant 충족 — 구 무게이트는 맥북 예외였음).
_CHAT_ALIAS: dict[str, str] = {
"daily": MAC_MINI_DEFAULT, # router tier_b → Mac mini :8801 gemma-4-26b
"deep": QWEN_MACBOOK, # router named upstream → M5 Max Qwen3.6-27B (무게이트, D-2)
"daily": MAC_MINI_DEFAULT, # router tier_b → Mac mini :8801
"deep": MAC_MINI_DEFAULT, # 맥북 폐기로 동일 upstream — ReAct 검색 모드 구분만 유지
}
# read 는 per-chunk 적용이라 MacBook wake(24s)+토큰 생성 간격 커버. connect 는 내부 router 라 짧게.
@@ -161,10 +164,10 @@ class EidAIClient(AIClient):
_rewrite_sse_line 으로 model 치환(mode 어휘)·usage 제거만 하고 프레이밍은 보존.
취소/disconnect 시 AsyncExitStack 이 response·client 정리(upstream 닫힘 보장).
daily(mac-mini-default)는 Mac mini MLX 단일 inference 영구 룰(llm_gate docstring
"예외 없이 gate 획득 필수")에 따라 acquire_mlx_gate(FOREGROUND) 안에서 스트리밍 —
RouterBackend 의 requires_gate=True 와 동일한 client-side mutex 효과.
deep(qwen-macbook)은 별 endpoint 라 무게이트 (D-2, RouterBackend 동형).
daily/deep 모두 mac-mini-default(2026-06-11 맥북 백지화) → Mac mini MLX 단일
inference 영구 룰(llm_gate docstring "예외 없이 gate 획득 필수")에 따라
acquire_mlx_gate(FOREGROUND) 안에서 스트리밍 — 게이트 조건이 alias 기준이라
deep 도 자동 적용 (구 무게이트는 맥북 별 endpoint 시절 예외였음).
중계 전체(업스트림 진입~종료)는 asyncio.timeout(_STREAM_DEADLINE_S) wall-clock
deadline 안 — llm_gate 계약 "timeout 은 gate 안쪽" 준수(gate 대기엔 미적용).
+4 -1
View File
@@ -61,7 +61,7 @@ async def lifespan(app: FastAPI):
from workers.csb_collector import run as csb_collector_run
from workers.api_standards_collector import run as api_standards_run
from workers.ccps_collector import run as ccps_collector_run
from workers.queue_consumer import consume_queue, consume_markdown_queue
from workers.queue_consumer import consume_queue, consume_fast_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
@@ -95,6 +95,9 @@ async def lifespan(app: FastAPI):
# 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을
# stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지.
scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer")
# 2026-06-12 fast-consumer split: embed/chunk(건당 <1s)를 LLM 사이클에서 분리 —
# classify(~190s×3)가 사이클을 점유해 벡터 적재가 굶던 구조 캡 해소 (markdown 선례).
scheduler.add_job(consume_fast_queue, "interval", minutes=1, id="fast_queue_consumer")
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup")
# PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리).
+5
View File
@@ -14,6 +14,11 @@ from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
# FK("users.id") 해석에 users 테이블 메타데이터 필요 — fastapi 앱은 어차피 전 모델을
# import 하지만, CLI 단독 실행(queue_drain 등)은 본 모듈만 끌어와 INSERT 시
# "could not find table 'users'" 로 실패했다 (2026-06-12 drain 로그 실측). 명시 import.
from models.user import User # noqa: F401
class AnalyzeEvent(Base):
__tablename__ = "analyze_events"
+104 -1
View File
@@ -22,7 +22,7 @@ from datetime import datetime, timedelta
from posixpath import basename
from zoneinfo import ZoneInfo
from sqlalchemy import text
from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
@@ -258,6 +258,8 @@ def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]:
"pending": st["pending"],
"processing": st["processing"],
"failed": st["failed"],
"done_1h": st["done_1h"],
"created_1h": st["created_1h"],
"done_today": st["done_today"],
"oldest_pending_age_sec": age,
})
@@ -408,3 +410,104 @@ async def build_overview(session: AsyncSession) -> dict:
deep_enabled=deep_enabled,
now_kst=now_kst,
)
# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은
# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자).
# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록)
# started_at 을 시각 fallback 으로 쓴다.
_FAILED_LIST_SQL = """
SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts,
q.error_message,
COALESCE(q.completed_at, q.started_at) AS failed_at,
d.title, d.original_filename, d.file_path
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.status = 'failed'
ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST
LIMIT 300
"""
# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여).
# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해.
# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 —
# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고.
_RETRY_SQL = """
UPDATE processing_queue q
SET status = 'pending', attempts = 0,
started_at = NULL, completed_at = NULL
WHERE q.id IN :ids
AND q.status = 'failed'
AND NOT EXISTS (
SELECT 1 FROM processing_queue p
WHERE p.document_id = q.document_id
AND p.stage = q.stage
AND p.status IN ('pending', 'processing')
AND p.id <> q.id
)
RETURNING q.id
"""
# 건너뛰기: failed → completed + payload 마킹 (감사 추적).
# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가
# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로.
_SKIP_SQL = """
UPDATE processing_queue
SET status = 'completed', completed_at = NOW(),
payload = COALESCE(payload, '{}'::jsonb)
|| jsonb_build_object('skipped_by_user', true,
'skipped_at', NOW()::text)
WHERE id IN :ids AND status = 'failed'
RETURNING id
"""
async def fetch_failed_items(session: AsyncSession) -> list[dict]:
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)."""
rows = (await session.execute(text(_FAILED_LIST_SQL))).all()
return [
{
"id": r[0],
"stage": r[1],
"document_id": r[2],
"attempts": int(r[3] or 0),
"max_attempts": int(r[4] or 0),
"error_message": r[5],
"failed_at": r[6],
"title": display_title({
"document_id": r[2],
"title": r[7],
"original_filename": r[8],
"file_path": r[9],
}),
}
for r in rows
]
async def retry_failed(session: AsyncSession, ids: list[int]) -> dict:
"""failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님."""
unique_ids = list(set(ids))
stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True))
retried = (await session.execute(stmt, {"ids": unique_ids})).all()
await session.commit()
return {
"requested": len(unique_ids),
"retried": len(retried),
"not_retried": len(unique_ids) - len(retried),
}
async def skip_failed(session: AsyncSession, ids: list[int]) -> dict:
"""failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음."""
unique_ids = list(set(ids))
stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True))
skipped = (await session.execute(stmt, {"ids": unique_ids})).all()
await session.commit()
return {
"requested": len(unique_ids),
"skipped": len(skipped),
"not_skipped": len(unique_ids) - len(skipped),
}
+40 -12
View File
@@ -26,8 +26,11 @@ PR-MacBook-RAG-Backend-1 부터 `services.llm.QwenMacBookBackend` 는 별 endpoi
- **fallback(Claude Sonnet 4 API) 경로는 gate 제외**. PR #20 이후 fallback = Claude API. 단 현재
구현상 `AIClient._call_chat` 내부에서 primaryfallback 전환이 일어나므로
fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음).
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
inference 특성이 깨지지 않는 값을 올리지 .
- ~~**MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**~~ **2026-06-12 개정**:
룰의 전제(서버 = single-inference) 소멸 mlx_vlm server continuous
batching 으로 동시 스트림 흡수(실측). 상한은 config `pipeline.mlx_gate_concurrency`
(기본 1, 운영 2). **게이트 자체(상한+우선순위 ) 영구 유지** thundering herd
(23 concurrent 22 timeout 사고) 방지는 계속 상한이 담당. 무제한 금지.
## 우선순위 정책 (B-1, 2026-05-17)
@@ -80,8 +83,22 @@ from core.utils import setup_logger
logger = setup_logger("llm_gate")
# MLX primary는 single-inference → 1
MLX_CONCURRENCY = 1
def _capacity() -> int:
"""게이트 동시 실행 상한 — config.yaml `pipeline.mlx_gate_concurrency` (기본 1).
2026-06-12 일반화: "MLX_CONCURRENCY = 1 고정" 영구 룰의 전제( 서버 = single-
inference, 23 concurrent 22 timeout 실측) 소멸 mlx_vlm server
continuous batching 으로 동시 스트림을 흡수(2026-06-11 6~8 concurrent 실측
정상). 게이트 자체(상한 + 우선순위) 유지하고 상한만 config thundering
herd 재발 방지는 상한이 계속 담당한다. 런타임 acquire 조회라
config 변경 + 프로세스 재기동으로 반영, 테스트는 settings monkeypatch.
"""
from core.config import settings
try:
return max(1, int(getattr(settings, "mlx_gate_concurrency", 1)))
except (TypeError, ValueError):
return 1
# Background waiter wait_ms 가 이 값 초과 시 WARN (starvation 신호, aging mitigation 은 Phase 2)
STARVATION_WARN_MS = 300_000 # 5 min
@@ -101,7 +118,7 @@ DEFAULT_PRIORITY: Priority = Priority.BACKGROUND
# Tuple format: (priority: int, seq: int, future: asyncio.Future, enqueue_ts: float)
_waiters: list[tuple[int, int, asyncio.Future, float]] = []
_seq = itertools.count()
_inflight: bool = False
_inflight_n: int = 0 # 동시 실행 수 (구 bool — capacity 일반화로 카운터)
_lock: asyncio.Lock | None = None
@@ -143,7 +160,7 @@ async def acquire_mlx_gate(
`asyncio.timeout` 반드시 gate 안쪽 (Future await ) .
"""
global _inflight, _waiters
global _inflight_n, _waiters
lock = _get_lock()
seq = next(_seq)
@@ -152,9 +169,9 @@ async def acquire_mlx_gate(
fut: asyncio.Future | None = None
async with lock:
if not _inflight and not _waiters:
if _inflight_n < _capacity() and not _waiters:
# fast path — 즉시 inflight 진입, Future 생성 안 함
_inflight = True
_inflight_n += 1
else:
# 대기열 진입
fut = asyncio.get_event_loop().create_future()
@@ -194,8 +211,8 @@ async def acquire_mlx_gate(
async with lock:
next_fut = _dispatch_next_locked()
if next_fut is None:
_inflight = False
# _inflight 는 True 유지 (다음 waiter 가 진입 예정)
_inflight_n = max(0, _inflight_n - 1)
# next_fut 가 있으면 슬롯 handover — 카운트 유지 (다음 waiter 가 진입 예정)
logger.debug(
"mlx_gate release duration_ms=%.0f priority=%s seq=%d",
duration_ms, priority.name, seq,
@@ -222,13 +239,24 @@ def get_mlx_gate():
return acquire_mlx_gate(DEFAULT_PRIORITY)
# ── Read-only status (UI 표시용) ─────────────────────────────────────────────
def gate_status() -> dict:
"""현재 gate 점유 스냅샷 (read-only, lock-free 근사치 — UI 표시용).
inflight = 동시 실행 (int). 기존 소비자(eid status) bool() 캐스팅이라 호환.
"""
return {"inflight": _inflight_n, "waiters": len(_waiters)}
# ── Test helpers (conftest reset) ────────────────────────────────────────────
def _reset_for_test() -> None:
"""테스트 fixture 가 fresh loop 마다 호출. production code 에서 사용 X."""
global _waiters, _inflight, _lock, _seq
global _waiters, _inflight_n, _lock, _seq
_waiters = []
_inflight = False
_inflight_n = 0
_lock = None
_seq = itertools.count()
+65 -1
View File
@@ -63,8 +63,41 @@ CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
"chunks_table": "document_chunks_cand_snowflake_l_v2",
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
},
# ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ───
# embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용,
# G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재.
# qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측).
"cand_qwen06": {
"docs_table": "documents_cand_qwen06",
"chunks_table": "document_chunks_cand_qwen06",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:0.6b",
},
"cand_qwen4": {
"docs_table": "documents_cand_qwen4",
"chunks_table": "document_chunks_cand_qwen4",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
},
"cand_qwen4m": {
"docs_table": "documents_cand_qwen4m",
"chunks_table": "document_chunks_cand_qwen4m",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
"embed_dimensions": 1024,
},
}
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
# 문구 변경 = 저장=조회 불변식 위반과 동급. 쿼리 측 전용, 문서 적재는 plain).
QWEN3_QUERY_INSTRUCT = (
"Instruct: Given a web search query, retrieve relevant passages that answer the query"
"\nQuery: "
)
# 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist.
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
# corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point).
@@ -137,6 +170,34 @@ async def _embed_query_via_tei(endpoint: str, text_: str) -> list[float] | None:
return None
async def _embed_query_via_ollama(cfg: dict, text_: str) -> list[float] | None:
"""Phase 2A 후보 쿼리 임베딩 — Ollama /api/embed + 비대칭 instruct prefix.
쿼리 전용: QWEN3_QUERY_INSTRUCT 선두에 붙인다 (문서 적재 = plain).
embed_dimensions 지정(qwen4m) Ollama dimensions 옵션 = MRL truncate+재정규화
(G-1 fixture: 1024 출력 L2=1.0 실측). cache 미사용 slug 분포 상이.
"""
if not text_:
return None
import httpx
body: dict = {"model": cfg["embed_model"], "input": [QWEN3_QUERY_INSTRUCT + text_]}
if cfg.get("embed_dimensions"):
body["dimensions"] = cfg["embed_dimensions"]
try:
async with httpx.AsyncClient(timeout=60.0) as c:
r = await c.post(cfg["embed_endpoint"], json=body)
r.raise_for_status()
embs = r.json().get("embeddings")
if not isinstance(embs, list) or not embs or not isinstance(embs[0], list):
raise ValueError("unexpected /api/embed shape")
return embs[0]
except Exception as exc:
logger.warning(
"candidate ollama embed failed model=%s err=%r", cfg.get("embed_model"), exc
)
return None
def _query_embed_key(text_: str) -> str:
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
@@ -323,7 +384,10 @@ async def search_vector(
else:
docs_table = cfg["docs_table"]
chunks_table = cfg["chunks_table"]
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
if cfg.get("embed_kind") == "ollama":
query_embedding = await _embed_query_via_ollama(cfg, query)
else:
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
logger.info(
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
+1 -1
View File
@@ -47,7 +47,7 @@ logger = setup_logger("synthesis")
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
PROMPT_VERSION = "v2"
LLM_TIMEOUT_MS = 30000 # 2026-05-17 B-3: 15s 시 동시 부하 (Mac mini 26B classifier+evidence+synthesis serialized) 빈발 timeout — classifier (30s) 와 align
LLM_TIMEOUT_MS = 120000 # 2026-06-11 Qwen3.6-27B-6bit 전환: 프리필 ~112 tok/s·디코드 ~11.7 tok/s 실측 — 30s 면 synthesis(답변 본체) 상시 timeout. synthesis 는 graceful skip 불가(=답변 실패)라 단독 상향, config ask.backend.timeout_read_s=120 와 align
CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게)
CACHE_MAXSIZE = 300
MAX_ANSWER_CHARS = 600
+4
View File
@@ -8,6 +8,7 @@
import asyncio
from datetime import date
from core.config import settings
from core.utils import setup_logger
from services.briefing.pipeline import run_briefing_pipeline
@@ -22,6 +23,9 @@ async def run(target_date: date | None = None) -> dict | None:
Args:
target_date: KST 기준 briefing_date (None = 오늘). API regenerate 명시 지정 가능.
"""
if "briefing" in settings.pipeline_held_stages:
logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip")
return None
try:
result = await asyncio.wait_for(
run_briefing_pipeline(target_date),
+63 -14
View File
@@ -31,12 +31,18 @@ from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response, strip_thinking
from ai.client import (
AIClient,
call_deep_or_defer,
is_deferrable_error,
parse_json_response,
strip_thinking,
)
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import enqueue_stage
from models.queue import StageDeferred, enqueue_stage
from policy.prompt_render import render_4b, policy_version as compute_policy_version
from policy.routing import decide_routing
from services.document_telemetry import record_analyze_event
@@ -345,13 +351,20 @@ _FRONTMATTER_PRESERVED_KEYS = {
# ───────────────────────── main process ────────────────────────────────
async def process(document_id: int, session: AsyncSession) -> None:
async def process(
document_id: int, session: AsyncSession, *, use_deep: bool = False
) -> None:
"""문서 분류 + 요약 + tier triage.
1) Legacy: classify() ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion
2) Legacy: summarize() ai_summary
3) PR-B B-1: summary_triage (4B) ai_tldr/ai_bullets/ai_analysis_tier='triage'
use_deep (2026-06-12 fair-share, queue_drain 전용): triage LLM 호출을 deep 슬롯
(맥북, 라우터 경유)으로 보낸다 sampling triage temperature/max_tokens
유지(분류 결정성), endpoint 교체. 맥북 불가 = StageDeferred 전파(drain
보류 처리). False(기본/consumer) = 기존 call_triage(맥미니 직접) 그대로.
예외 source_channel='law_monitor':
법령은 외부 source-of-truth (law.go.kr) 보유 + immutable + 자동 재수집.
AI 분류는 무가치 + 본문 해석 환각 위험. 26B legacy + 4B triage 전부 skip.
@@ -446,10 +459,20 @@ async def process(document_id: int, session: AsyncSession) -> None:
logger.info(f"doc {document_id}: frontmatter 부분 인식 → LLM 으로 미설정 필드 보완")
client = AIClient()
# fair-share (2026-06-12): use_deep 시 legacy classify/summarize 도 deep 슬롯(맥북)
# 경유 — 그래야 drain 의 "맥북 분담" 이 실제로 성립 (triage 만 보내면 50K 요약
# 프리필이 맥미니에 남는다). deep 슬롯 sampling = primary 와 동일(0.3/0.9/8192).
legacy_cfg = settings.ai.deep if (use_deep and settings.ai.deep is not None) else None
try:
# ─── 1. Legacy classify (primary 26B) ───
# ─── 1. Legacy classify (primary 또는 deep) ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated)
try:
raw_response = await client.classify(truncated, cfg=legacy_cfg)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
# 맥북 불가 — 첫 호출(최저 비용 지점)에서 보류로 전환, doc 쓰기 0
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
parsed = parse_json_response(raw_response)
if not parsed:
@@ -517,12 +540,17 @@ async def process(document_id: int, session: AsyncSession) -> None:
"reason": "classify pipeline",
}
# ─── 2. Legacy 요약 (primary 26B) ───
summary = await client.summarize(doc.extracted_text[:50000])
# ─── 2. Legacy 요약 (primary 또는 deep) ───
try:
summary = await client.summarize(doc.extracted_text[:50000], cfg=legacy_cfg)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 (legacy 완료) ───
doc.ai_model_version = settings.ai.primary.model
# ─── 메타데이터 (legacy 완료) — 실제 처리 머신 귀속 (drain=qwen-macbook) ───
doc.ai_model_version = (legacy_cfg or settings.ai.primary).model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
@@ -533,7 +561,9 @@ async def process(document_id: int, session: AsyncSession) -> None:
# ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ───
try:
await _run_tier_triage(client, doc, session)
await _run_tier_triage(client, doc, session, use_deep=use_deep)
except StageDeferred:
raise # 보류는 실패가 아님 — drain/consumer 가 attempts 미소모 처리
except Exception as exc:
logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}")
@@ -541,8 +571,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
await client.close()
async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None:
"""summary_triage (p3a_short_summary) 경로."""
async def _run_tier_triage(
client: AIClient, doc: Document, session: AsyncSession, *, use_deep: bool = False
) -> None:
"""summary_triage (p3a_short_summary) 경로. use_deep = process() 에서 전달 (drain 전용)."""
document_id = doc.id
text = doc.extracted_text or ""
input_chars = len(text)
@@ -550,6 +582,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
triage_start = time.perf_counter()
parse_error: str | None = None
triage_out = TriageOutput()
# drain 경유 시 triage 도 deep 슬롯(맥북) — sampling 은 triage 것 유지(결정성).
deep_triage_cfg = None
if use_deep and settings.ai.deep is not None:
deep_triage_cfg = settings.ai.deep.model_copy(update={
"temperature": settings.ai.triage.temperature,
"top_p": settings.ai.triage.top_p,
"max_tokens": settings.ai.triage.max_tokens,
})
# 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate
if input_chars > TRIAGE_TEXT_LIMIT:
@@ -590,7 +630,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT])
try:
raw_triage = await client.call_triage(prompt)
if deep_triage_cfg is not None:
# drain 전용 — deep 슬롯 endpoint + triage sampling. 맥북 불가(StageDeferred)
# 는 아래 generic except 에 먹히지 않게 먼저 전파.
raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg)
else:
raw_triage = await client.call_triage(prompt)
except StageDeferred:
raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전)
except Exception as exc:
logger.warning(
"[triage] 4B 호출 실패 id=%s type=%s repr=%r",
@@ -656,6 +703,7 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
escalation_reason=escalation_reason,
parse_error=parse_error,
routing_decision=routing_decision,
model_name=(deep_triage_cfg.model if deep_triage_cfg is not None else None),
)
@@ -670,6 +718,7 @@ async def _apply_triage_result(
escalation_reason: str | None,
parse_error: str | None,
routing_decision=None,
model_name: str | None = None, # fair-share: 실제 호출 경로 모델 (None=triage 기본)
) -> None:
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.
@@ -760,7 +809,7 @@ async def _apply_triage_result(
layers_returned=["tldr", "bullets"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.triage.model,
model_name=(model_name or settings.ai.triage.model),
prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK),
error_code=parse_error,
source="document_server",
+27 -7
View File
@@ -54,8 +54,18 @@ class DeepSummaryOutput(BaseModel):
confidence: float = 0.5
async def process(document_id: int, session: AsyncSession) -> None:
"""deep_summary 큐 pickup → 26B 호출 → 필드 저장."""
async def process(
document_id: int, session: AsyncSession, *, defer_on_deep_unavailable: bool = False
) -> None:
"""deep_summary 큐 pickup → LLM 호출 → 필드 저장.
defer_on_deep_unavailable:
False (기본, consumer 경로) = 맥북(deep 슬롯) 우선 시도, 불가 즉시
맥미니 primary 처리. 2026-06-12 fair-share: 머신이 동일 모델
(Qwen3.6-27B-6bit)이라 폴백 = 품질 강등이 아니라 단순 분배.
True (queue_drain 전용) = 맥북 불가를 StageDeferred 올려 drain
보류 run 멈춘다 (drain = 맥북 분담 전용 레버 시멘틱 유지).
"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"deep_summary: document id={document_id} 없음")
@@ -111,16 +121,26 @@ async def process(document_id: int, session: AsyncSession) -> None:
try:
start = time.perf_counter()
if deep_cfg is not None:
# 맥북 경유 — 맥미니 mlx gate 미점유(게이트는 맥미니 보호 목적). 맥북 불가
# (503/연결/생성 중 sleep 절단)는 StageDeferred = 보류, 맥미니 강등 없음.
# doc 쓰기는 완주+파싱 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
raw = await call_deep_or_defer(client, prompt)
# 맥북 우선 — 맥미니 mlx gate 미점유(별 endpoint). doc 쓰기는 완주+파싱
# 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
try:
raw = await call_deep_or_defer(client, prompt)
except StageDeferred:
if defer_on_deep_unavailable:
raise # drain 전용 — 맥북 레버 시멘틱 (보류 후 run 종료)
# consumer 경로: 동일 모델이라 강등 아님 — 맥미니가 즉시 처리 (2026-06-12)
logger.info(
f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)"
)
used_cfg = settings.ai.primary
async with acquire_mlx_gate(Priority.BACKGROUND):
raw = await client.call_primary(prompt)
else:
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
raw = await client.call_primary(prompt)
latency_ms = int((time.perf_counter() - start) * 1000)
except StageDeferred:
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), consumer 가 백오프 기록.
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), drain 이 백오프 기록.
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
raise
except Exception as exc:
+4
View File
@@ -10,6 +10,7 @@ global_digests / digest_topics 테이블에 저장한다.
import asyncio
from core.config import settings
from core.utils import setup_logger
from services.digest.pipeline import run_digest_pipeline
@@ -24,6 +25,9 @@ async def run() -> None:
pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout summarizer 처리).
여기서는 전체 hard cap 강제.
"""
if "digest" in settings.pipeline_held_stages:
logger.info("[global_digest] 보류 (pipeline.held_stages) — 이번 실행 skip")
return
try:
result = await asyncio.wait_for(
run_digest_pipeline(),
+142
View File
@@ -0,0 +1,142 @@
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
설계 원칙 (plan r3):
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) 중단/재실행 이어서.
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
cand 테이블은 동결 범위로만 INSERT (retrieval cand path snapshot filter 타는 전제).
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
chunk_worker [제목][섹션][본문]) + plain (instruct prefix 쿼리 전용 G-1 불변식).
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
- qwen4m CLI 대상이 아님 qwen4 적재 SQL 파생(subvector+l2_normalize), plan E-1.
"""
import argparse
import asyncio
import hashlib
import time
import httpx
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from workers.embed_worker import _build_embed_input
logger = setup_logger("phase2a_cand_backfill")
OLLAMA_EMBED = "http://ollama:11434/api/embed"
TARGETS = {
"qwen06": {
"model": "qwen3-embedding:0.6b", "dim": 1024,
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
},
"qwen4": {
"model": "qwen3-embedding:4b", "dim": 2560,
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
},
}
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
r.raise_for_status()
embs = r.json()["embeddings"]
if len(embs) != len(texts):
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
return embs
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT d.id FROM documents d
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
ORDER BY d.id LIMIT :b
"""), {"m": doc_id_max, "b": batch})).scalars().all()
if not rows:
break
docs = [(await session.get(Document, i)) for i in rows]
inputs = [_build_embed_input(d) for d in docs]
embs = await _embed_batch(http, target["model"], inputs)
for d, inp, e in zip(docs, inputs, embs):
await session.execute(text(f"""
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
VALUES (:i, :h, cast(:e AS vector))
ON CONFLICT (doc_id) DO NOTHING
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
return total
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
ORDER BY c.id LIMIT :b
"""), {"m": chunk_id_max, "b": batch})).all()
if not rows:
break
inputs = [
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
for r in rows
]
embs = await _embed_batch(http, target["model"], inputs)
for r, e in zip(rows, embs):
await session.execute(text(f"""
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
ON CONFLICT (id) DO NOTHING
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
"s": r.section_title, "t": r.text, "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
return total
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
target = TARGETS[target_key]
start = time.monotonic()
async with httpx.AsyncClient() as http:
nd = await backfill_docs(target, doc_id_max, batch, http)
nc = await backfill_chunks(target, chunk_id_max, batch, http)
mins = (time.monotonic() - start) / 60
async with async_session() as session:
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
logger.info(
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
)
def main() -> None:
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
p.add_argument("--target", required=True, choices=sorted(TARGETS))
p.add_argument("--doc-id-max", type=int, required=True)
p.add_argument("--chunk-id-max", type=int, required=True)
p.add_argument("--batch", type=int, default=32)
a = p.parse_args()
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
if __name__ == "__main__":
main()
+46 -3
View File
@@ -13,18 +13,25 @@ from sqlalchemy import select, update, delete, exists
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import aliased
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
logger = setup_logger("queue_consumer")
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
_hold_logged = False
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
@@ -34,14 +41,21 @@ STALE_THRESHOLD_MINUTES = 10
# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다.
MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"))
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue 로 분리.
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue,
# embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize", "embed", "chunk",
"extract", "classify", "summarize",
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비.
# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다
# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴.
FAST_QUEUE_STAGES = ["embed", "chunk"]
async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES):
"""processing 상태로 오래 방치된 항목 복구 (지정 stage 한정)
@@ -335,14 +349,43 @@ async def _process_stage(stage, worker_fn):
async def consume_queue():
"""메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리."""
global _hold_logged
workers = _load_workers()
held = [s for s in MAIN_QUEUE_STAGES if s in settings.pipeline_held_stages]
if held and not _hold_logged:
logger.info(f"pipeline.held_stages 보류 중: {held} — claim 하지 않음 (pending 적체 = 의도)")
_hold_logged = True
try:
await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("stale reset failed, but continuing queue consumption")
for stage in MAIN_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
async def consume_fast_queue():
"""embed/chunk 전용 고속 소비자 — LLM 사이클과 완전 디커플 (2026-06-12).
main 루프는 classify/summarize/deep 사이클을 단위로 점유해 건당 <1s 짜리
embed/chunk 사이클당 1번씩만 기회를 얻었다 (실효 ~60/ = 적체 원인).
분리 = 1 × 배치 10 ~600/. APScheduler max_instances=1 이라
배치가 1분을 넘으면 다음 fire coalesce (폭주 방지).
"""
workers = _load_workers()
try:
await reset_stale_items(FAST_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("fast stale reset failed, but continuing queue consumption")
for stage in FAST_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
+18 -6
View File
@@ -30,9 +30,12 @@ from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition
logger = setup_logger("queue_drain")
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 (consumer 도 deep 슬롯 시 맥북 경유).
# classify 는 triage 경량 호출이라 맥미니 적합 — 대상에서 제외 (plan Q-4).
DRAIN_STAGES = ("summarize", "deep_summary")
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 / classify = triage 분류.
# classify 는 2026-06-12 fair-share 로 합류 — 구 제외 사유(plan Q-4 "triage 경량 = 맥미니
# 적합")는 Gemma a4b(42 tok/s) 전제. Qwen 27B 전환 후 classify 가 장문 프리필로 컨슈머
# 사이클을 점유하는 최대 병목이라, 맥북(프리필 ~5배)이 가장 효과적인 분담처다.
# classify 완료 시 enqueue_next_stage(embed/chunk/markdown) 필수 — 누락 = DAG 단절.
DRAIN_STAGES = ("summarize", "deep_summary", "classify")
async def _claim_one(stage: str) -> tuple[int, int] | None:
@@ -98,14 +101,16 @@ async def _mark_failed(queue_id: int, exc: Exception) -> None:
async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None:
if stage not in DRAIN_STAGES:
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)")
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용")
if settings.ai.deep is None:
raise SystemExit(
"config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음"
" (맥미니로의 silent 강등 금지)"
)
from workers.classify_worker import process as classify_process
from workers.deep_summary_worker import process as deep_summary_process
from workers.queue_consumer import enqueue_next_stage
from workers.summarize_worker import process as summarize_process
done = failed = 0
@@ -121,11 +126,18 @@ async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int
async with async_session() as worker_session:
if stage == "summarize":
await summarize_process(document_id, worker_session, use_deep=True)
elif stage == "classify":
await classify_process(document_id, worker_session, use_deep=True)
else:
# deep_summary 는 deep 슬롯 구성 시 워커가 자체적으로 맥북 경유
await deep_summary_process(document_id, worker_session)
# deep_summary: drain 은 맥북 전용 레버 — 불가 시 보류(폴백은 consumer 만)
await deep_summary_process(
document_id, worker_session, defer_on_deep_unavailable=True
)
await worker_session.commit()
await _mark_completed(queue_id)
# 다음 stage 연쇄 — classify 는 embed/chunk/markdown enqueue (consumer 와 동형,
# summarize/deep_summary 는 next_stages 미등록이라 no-op)
await enqueue_next_stage(document_id, stage)
done += 1
consecutive_defers = 0
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
@@ -14,6 +14,7 @@ from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.study_memo_card_job import StudyMemoCardJob
@@ -50,6 +51,10 @@ async def reset_stale_card_jobs() -> None:
async def consume_study_memo_card_queue() -> None:
"""APScheduler 진입점. pending card_extract job 을 BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
if "study_memo_card" in settings.pipeline_held_stages:
logger.debug("study_memo_card 보류 (pipeline.held_stages)")
return
await reset_stale_card_jobs()
async with async_session() as session:
+5
View File
@@ -59,6 +59,11 @@ async def reset_stale_study_jobs() -> None:
async def consume_study_queue() -> None:
"""APScheduler 진입점. pending job BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: env(study_explanation_enabled) 와 별개의 self-contained 게이트.
# pending 은 그대로 유지 (Mac mini derived-worker 흡수 경로도 본 게이트와 무관).
if "study_explanation" in settings.pipeline_held_stages:
logger.debug("study_explanation 보류 (pipeline.held_stages)")
return
await reset_stale_study_jobs()
async with async_session() as session:
@@ -12,6 +12,7 @@ from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.study_quiz_session_job import StudyQuizSessionJob
@@ -48,6 +49,10 @@ async def reset_stale_session_jobs() -> None:
async def consume_study_session_queue() -> None:
"""APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
if "study_session_analysis" in settings.pipeline_held_stages:
logger.debug("study_session_analysis 보류 (pipeline.held_stages)")
return
await reset_stale_session_jobs()
async with async_session() as session:
+40 -12
View File
@@ -6,25 +6,40 @@ ai:
models:
# ─── 단일 generation 호스트 routing (2026-05-14 GPU LLM 제거) ───
# GPU Ollama gemma4:e4b-it-q8_0 제거. Mac mini 26B-A4B 가 triage + primary + classifier 모두 흡수.
# fallback 은 Claude Sonnet 4 API (Mac mini 다운 시 자동 trigger, premium 과 budget 공유).
# plan: ~/.claude/plans/rosy-launching-otter.md §C/§D/§E
# 2026-06-11 B안: 맥미니 모델 = Gemma 26B-A4B → Qwen3.6-27B-6bit 풀교체 (사용자 결정).
# dense 27B 라 디코드 ~13 tok/s 급 (a4b ~42 대비 감속) → timeout 상향 (triage 30→120, primary 180→300).
# fallback 은 Claude Sonnet 4 API (CLAUDE_API_KEY 미주입 = 비활성).
# plan: ~/.claude/plans/rosy-launching-otter.md §C/§D/§E + project_macmini_model_decision
# triage: 상시 분류·요약·근거 선별. Mac mini 26B (primary 와 동일 endpoint, 짧은 max_tokens).
# triage: 상시 분류·요약·근거 선별. Mac mini Qwen 27B (primary 와 동일 endpoint, 짧은 max_tokens).
triage:
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
model: "mlx-community/Qwen3.6-27B-6bit"
max_tokens: 4096
timeout: 30
timeout: 480 # 프리필 실측 ~112 tok/s — 120K자 장문 커버 (2026-06-11)
context_char_limit: 120000
temperature: 0.0
# primary: 에스컬레이션 전용. 26B MLX (맥미니 Semaphore(1) 보호 대상).
# primary: 에스컬레이션 전용. Qwen 27B MLX (맥미니 Semaphore(1) 보호 대상).
primary:
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
model: "mlx-community/Qwen3.6-27B-6bit"
max_tokens: 8192
timeout: 180
timeout: 900 # 프리필 실측 ~112 tok/s — 260K자 상한 장문 커버 (2026-06-11)
context_char_limit: 260000
temperature: 0.3
top_p: 0.9
# deep: 야간 night-drain 전용 — 맥북 M5 Max Qwen3.6-27B-6bit (llm-router :8890 경유,
# model=qwen-macbook alias). 2026-06-11 재도입 (사용자: 자기 전 night-drain 으로 백로그 분담).
# 맥북 불가(503/연결/절단) = StageDeferred 보류 — 맥미니/cloud 강등 없음, attempts 미소모.
# consumer 의 deep_summary 도 슬롯 존재 시 맥북 경유 (잠들어 있으면 30분 백오프 보류 = 무해).
# 슬롯 제거 시 deep_summary 는 primary(맥미니) 경로 복귀.
deep:
endpoint: "http://100.76.254.116:8890/v1/chat/completions"
model: "qwen-macbook"
max_tokens: 8192
timeout: 900
context_char_limit: 260000
temperature: 0.3
top_p: 0.9
@@ -58,9 +73,9 @@ ai:
# classifier_service 가 hasattr 체크로 optional 이므로 이 섹션 제거 시 classifier gate 는 자동 skip (score-only).
classifier:
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
model: "mlx-community/Qwen3.6-27B-6bit" # 2026-06-11 B안 동승 — gemma id 잔존 시 mlx 서버가 Gemma 를 재로드(이중 적재) 위험
max_tokens: 512
timeout: 30 # 2026-05-17: 15s 도 동시 부하 시 elapsed 14.4s 직전이라 tight — 30s 로 2x 마진 (Mac mini 26B concurrent load). classifier_service.LLM_TIMEOUT_MS=30000 와 align
timeout: 30 # 2026-05-17: 15s 도 동시 부하 시 elapsed 14.4s 직전이라 tight — 30s 로 2x 마진. classifier_service.LLM_TIMEOUT_MS=30000 와 align (초과 = score-only skip, graceful)
# 제거: vision (미사용)
# ─── deep_summary enqueue 폭발 억제 (B-1 R2) ───
@@ -84,7 +99,7 @@ search:
macbook_url: "http://100.118.112.84:8810" # MacBook M5 Max Tailscale interface bind
macbook_model: "mlx-community/Qwen3.6-27B-8bit"
timeout_connect_s: 1 # MacBook sleep/wake 빠른 감지 (자동 fallback 부재 → 빠른 503)
timeout_read_s: 30 # synthesis_service.LLM_TIMEOUT_MS=30000 와 align
timeout_read_s: 120 # 2026-06-11 Qwen 27B(디코드 ~11.7 tok/s) — synthesis_service.LLM_TIMEOUT_MS=120000 와 align
# PR-DocSrv-Ask-ToolCalling-ReAct-1: /api/search/ask/react ReAct loop (qwen-macbook only)
react:
enabled: true
@@ -176,3 +191,16 @@ schedule:
daily_digest: "20:00"
file_watcher_interval_minutes: 5
queue_consumer_interval_minutes: 10
# 생성 LLM 홀드 게이트 (2026-06-11 신설): held_stages 에 든 이름의 컨슈머/워커는 claim 자체를
# 하지 않는다 (attempts 미소모, pending 적체). 유효 키 8 = classify/summarize/deep_summary(큐) +
# digest/briefing(cron) + study_explanation/study_session_analysis/study_memo_card(컨슈머).
# 그 외 문자열은 무동작(오타 주의). 적용/해제 = 리스트 수정 후 fastapi 재기동.
# 이력: 2026-06-11 맥미니 모델 확정까지 8키 홀드 → 同日 Qwen3.6-27B-6bit 전환과 함께 해제([]).
pipeline:
held_stages: []
# mlx gate 동시 실행 상한 (2026-06-12 fair-share): 구 "1 고정" 룰의 전제(single-inference
# 서버)가 소멸 — 현 mlx_vlm 은 continuous batching (2026-06-11 밤 6~8 concurrent 실측 정상).
# 2 = 워커 LLM 호출과 인터랙티브(ask/eid)가 서로 안 막힘 + 집계 throughput ~1.8배.
# 게이트(상한+우선순위)는 유지 — thundering herd 방지. 1 로 되돌리면 구 동작.
mlx_gate_concurrency: 2
@@ -0,0 +1,419 @@
<script lang="ts">
// 처리 머신 보드 v2 — 파이프라인 흐름 뷰 (plan ds-board-engines-1, R2 통합안).
// 메인 = 좌→우 흐름 노드(병목 amber·실패 뱃지), 노드 클릭 = 상세 패널(안1 변형),
// 실패 뱃지 클릭 = 실패 처리 드로어 (재시도/건너뛰기 — 영구 실패의 유일한 조치 경로).
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어 열 때).
import { api } from '$lib/api';
import { refreshQueueOverview } from '$lib/stores/queueOverview';
import { addToast } from '$lib/stores/toast';
import {
AUX_NODES,
FLOW_NODES,
MACHINE_META,
type FlowNodeDef,
etaShort,
flowStageLabel,
formatAgeSec,
formatRate,
} from '$lib/utils/queueDisplay';
import type {
FailedItem,
FailedListResponse,
MachineCurrentItem,
QueueOverview,
QueueStageRow,
RetryResponse,
SkipResponse,
} from '$lib/types/queue';
let { overview }: { overview: QueueOverview } = $props();
// ─── 노드 통계 합성 ───
interface NodeStats {
def: FlowNodeDef;
/** 다중 stage 노드(청크·임베딩)는 같은 문서가 양쪽 큐에 있어 max — 합산 = 이중계산 */
pending: number;
processing: number;
failed: number; // 실패는 행 단위 사실이라 합산
done1h: number;
created1h: number;
doneToday: number;
oldestAgeSec: number | null;
etaMinutes: number | null;
inflowDominant: boolean;
perStage: QueueStageRow[];
}
const stageBy = $derived(new Map(overview.stages.map((s) => [s.stage, s])));
function nodeStats(def: FlowNodeDef): NodeStats {
const rows = def.stages
.map((s) => stageBy.get(s))
.filter((r): r is QueueStageRow => r != null);
const pending = rows.reduce((m, r) => Math.max(m, r.pending), 0);
const done1h = rows.reduce((m, r) => Math.max(m, r.done_1h), 0);
const created1h = rows.reduce((m, r) => Math.max(m, r.created_1h), 0);
const oldest = rows.reduce<number | null>(
(m, r) => (r.oldest_pending_age_sec == null ? m : Math.max(m ?? 0, r.oldest_pending_age_sec)),
null,
);
return {
def,
pending,
processing: rows.reduce((s, r) => s + r.processing, 0),
failed: rows.reduce((s, r) => s + r.failed, 0),
done1h,
created1h,
doneToday: rows.reduce((m, r) => Math.max(m, r.done_today), 0),
oldestAgeSec: oldest,
etaMinutes: pending > 0 && done1h > 0 ? Math.round((pending / done1h) * 60) : null,
inflowDominant: pending > 0 && created1h > done1h,
perStage: rows,
};
}
const mainNodes = $derived(FLOW_NODES.map(nodeStats));
const auxAll = $derived(AUX_NODES.map(nodeStats));
const auxActive = $derived(
auxAll.filter((n) => n.pending + n.processing + n.failed + n.doneToday > 0),
);
const auxIdle = $derived(
auxAll.filter((n) => n.pending + n.processing + n.failed + n.doneToday === 0),
);
const totalFailed = $derived(overview.totals.failed);
// 머신 스트립 — overview.machines 의 state/처리율 + 정적 모델 메타
const machineStrip = $derived(
overview.machines.map((m) => ({
...m,
meta: MACHINE_META[m.key],
})),
);
// ─── 선택 상태 (노드 상세 / 실패 드로어 — 동시에 하나만) ───
let selected = $state<string | null>(null);
let failOpen = $state(false);
function toggleNode(key: string) {
selected = selected === key ? null : key;
if (selected) failOpen = false;
}
const selectedNode = $derived(
[...mainNodes, ...auxAll].find((n) => n.def.key === selected) ?? null,
);
function nodeCurrent(def: FlowNodeDef): MachineCurrentItem[] {
return overview.machines.flatMap((m) => m.current.filter((c) => def.stages.includes(c.stage)));
}
// ─── 실패 드로어 ───
let failItems = $state<FailedItem[]>([]);
let failLoading = $state(false);
let busy = $state(false);
let expanded = $state<Record<string, boolean>>({});
async function openFailures() {
failOpen = true;
selected = null;
await loadFailures();
}
async function loadFailures() {
failLoading = true;
try {
const r = await api<FailedListResponse>('/queue/failed');
failItems = r.items;
} catch {
addToast('error', '실패 목록을 불러오지 못했습니다');
} finally {
failLoading = false;
}
}
interface FailGroup {
key: string;
stage: string;
pattern: string;
items: FailedItem[];
}
// 그룹핑 = stage + 에러 메시지 prefix(36자) — 같은 원인(ReadTimeout 등) 묶음
const failGroups = $derived.by(() => {
const map = new Map<string, FailGroup>();
for (const it of failItems) {
const pattern = (it.error_message ?? '(메시지 없음)').slice(0, 36);
const key = `${it.stage}::${pattern}`;
const g = map.get(key);
if (g) g.items.push(it);
else map.set(key, { key, stage: it.stage, pattern, items: [it] });
}
return [...map.values()].sort(
(a, b) => a.stage.localeCompare(b.stage) || b.items.length - a.items.length,
);
});
async function retryIds(ids: number[]) {
if (busy || ids.length === 0) return;
busy = true;
try {
const r = await api<RetryResponse>('/queue/retry', {
method: 'POST',
body: JSON.stringify({ ids }),
});
addToast(
'success',
`재시도 ${r.retried}건 큐 재진입${r.not_retried > 0 ? ` (${r.not_retried} 제외 이미 활성/처리됨)` : ''}`,
);
await afterAction();
} catch {
addToast('error', '재시도 요청 실패');
} finally {
busy = false;
}
}
async function skipIds(ids: number[]) {
if (busy || ids.length === 0) return;
busy = true;
try {
const r = await api<SkipResponse>('/queue/skip', {
method: 'POST',
body: JSON.stringify({ ids }),
});
addToast('success', `건너뛰기 ${r.skipped}건 처리 (해당 단계 제외)`);
await afterAction();
} catch {
addToast('error', '건너뛰기 요청 실패');
} finally {
busy = false;
}
}
async function afterAction() {
await Promise.all([loadFailures(), refreshQueueOverview()]);
}
// ─── trend_24h 스파크라인 (summarize 유입 vs 소화 — API 가 주는데 미렌더이던 슬롯) ───
const spark = $derived.by(() => {
const t = overview.trend_24h;
if (!t || t.length === 0) return null;
const max = Math.max(1, ...t.map((b) => Math.max(b.inflow, b.done)));
const w = 120;
const h = 24;
const step = w / Math.max(1, t.length - 1);
const pts = (sel: (b: (typeof t)[number]) => number) =>
t.map((b, i) => `${(i * step).toFixed(1)},${(h - (sel(b) / max) * (h - 3) + 1).toFixed(1)}`).join(' ');
return { inflow: pts((b) => b.inflow), done: pts((b) => b.done) };
});
</script>
<div class="mt-5">
<!-- 헤더: 타이틀 + 요약 24h 스파크라인 + 실패 합계 -->
<div class="flex items-center justify-between gap-3 mb-3">
<div class="text-[11px] font-bold text-dim uppercase tracking-wider">처리 머신</div>
<div class="flex items-center gap-3">
{#if totalFailed > 0}
<button
class="text-[11px] font-semibold text-error hover:underline cursor-pointer"
onclick={openFailures}
>실패 {totalFailed}건 처리</button>
{/if}
{#if spark}
<div class="flex items-center gap-2 text-[10px] text-faint tabular-nums" title="요약(summarize) 단계 24시간 — 유입(회색) vs 소화(녹색)">
<svg width="120" height="24" viewBox="0 0 120 24" class="block">
<polyline points={spark.inflow} fill="none" stroke="currentColor" stroke-width="1.5" class="text-faint" />
<polyline points={spark.done} fill="none" stroke="currentColor" stroke-width="1.5" class="text-success" />
</svg>
<span>요약 24h 유입/소화</span>
</div>
{/if}
</div>
</div>
<!-- 머신 스트립 -->
<div class="flex flex-wrap gap-2 mb-3">
{#each machineStrip as m (m.key)}
<div class="flex items-center gap-2 bg-surface border border-default rounded-full px-3.5 py-1.5 text-xs">
<span class="w-2 h-2 rounded-full shrink-0 {m.state === 'active' ? 'bg-success' : m.state === 'deferred' ? 'bg-warning' : 'bg-faint'}"></span>
<span class="font-bold text-text">{m.meta?.label ?? m.label}</span>
<span class="text-[10px] text-faint font-mono">{m.meta?.model}</span>
<span class="text-[11px] text-dim tabular-nums">{formatRate(m.done_1h)}/h</span>
{#if m.key === 'macbook' && m.deferred_pending > 0}
<span class="text-[10px] font-semibold text-warning tabular-nums">보류 {m.deferred_pending}</span>
{/if}
</div>
{/each}
</div>
<!-- 흐름 노드 -->
<div class="flex items-stretch overflow-x-auto pb-1">
{#each mainNodes as n, i (n.def.key)}
{#if i > 0}
<div class="flex items-center text-faint text-sm px-1.5 shrink-0" aria-hidden="true"></div>
{/if}
<div
class="relative bg-surface border-[1.5px] rounded-card px-3 py-2.5 min-w-[124px] shrink-0 text-left transition-colors cursor-pointer hover:bg-surface-hover
{n.inflowDominant ? 'border-warning' : n.etaMinutes != null && n.def.stages.includes('chunk') ? 'border-success' : 'border-default'}
{selected === n.def.key ? 'node-sel' : ''}"
role="button"
tabindex="0"
onclick={() => toggleNode(n.def.key)}
onkeydown={(e) => { if (e.key === 'Enter' || e.key === ' ') { e.preventDefault(); toggleNode(n.def.key); } }}
title="{n.def.label} — 클릭하면 상세"
>
{#if n.failed > 0}
<button
class="absolute -top-2 -right-1.5 text-[9px] font-extrabold bg-error text-white rounded-full px-1.5 py-px shadow cursor-pointer"
onclick={(e) => { e.stopPropagation(); openFailures(); }}
title="실패 {n.failed}건 — 클릭하면 실패 처리"
>{n.failed}</button>
{/if}
<span class="inline-block text-[9px] font-bold rounded px-1.5 py-px mb-1.5 mtag-{n.def.machine}">
{MACHINE_META[n.def.machine].label} · {n.def.engine}
</span>
<div class="text-xs font-bold text-text flex items-center gap-1.5">
{n.def.label}
{#if n.processing > 0}
<span class="inline-block w-1.5 h-1.5 rounded-full bg-accent animate-pulse" title="처리 중 {n.processing}"></span>
{/if}
{#if n.inflowDominant}
<span class="text-[9px] font-bold text-warning">유입 우세</span>
{/if}
</div>
<div class="text-base font-extrabold tabular-nums tracking-tight leading-tight mt-0.5 text-text">
{n.pending.toLocaleString()}
</div>
<div class="text-[10px] text-dim tabular-nums">
{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}
{#if n.etaMinutes != null && !n.inflowDominant && n.pending > 0}
· <span class="text-accent font-semibold">{etaShort(n.etaMinutes)}</span>
{/if}
</div>
</div>
{/each}
</div>
<!-- 보조 라인 -->
<p class="text-[10px] text-faint mt-1.5 tabular-nums">
{#each auxActive as n, i (n.def.key)}
{i > 0 ? ' · ' : '보조: '}{n.def.label}({n.def.engine}) 대기 {n.pending.toLocaleString()} · {formatRate(n.done1h)}/h{n.failed > 0 ? ` · 실패 ${n.failed}` : ''}
{/each}
{#if auxIdle.length > 0}
{auxActive.length > 0 ? ' — ' : ''}한가: {auxIdle.map((n) => n.def.label).join(' · ')}
{/if}
— 뉴스 등 일부 소스는 분류/추출을 건너뜀 (흐름 그림은 대표 경로)
</p>
<!-- 상세 패널 (노드 클릭) -->
{#if selectedNode}
<div class="border rounded-card mt-3 overflow-hidden bg-surface detail-frame">
<div class="flex items-center gap-2.5 px-4 py-2.5 text-xs font-bold detail-head">
{selectedNode.def.label}{selectedNode.def.engine}
<span class="text-[10px] font-mono font-medium text-dim bg-surface border border-default rounded px-1.5">{selectedNode.def.sub} · {MACHINE_META[selectedNode.def.machine].label}</span>
<button class="ml-auto text-[11px] text-dim font-normal cursor-pointer hover:text-text" onclick={() => (selected = null)}>닫기</button>
</div>
<div class="px-4 pb-3.5">
<div class="grid grid-cols-2 md:grid-cols-4 gap-2.5 my-2.5">
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">대기</div>
<div class="text-lg font-extrabold tabular-nums text-text">{selectedNode.pending.toLocaleString()}</div>
</div>
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">처리율 (1h)</div>
<div class="text-lg font-extrabold tabular-nums text-text">{formatRate(selectedNode.done1h)}<span class="text-[11px] text-dim font-semibold">/h</span></div>
</div>
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">오늘 완료</div>
<div class="text-lg font-extrabold tabular-nums text-text">{selectedNode.doneToday.toLocaleString()}</div>
</div>
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">소진 예상</div>
<div class="text-lg font-extrabold tabular-nums {selectedNode.inflowDominant ? 'text-warning' : 'text-accent'}">
{#if selectedNode.inflowDominant}유입 우세{:else if selectedNode.etaMinutes != null}{etaShort(selectedNode.etaMinutes)}{:else if selectedNode.pending === 0}한가{:else}{/if}
</div>
</div>
</div>
{#if selectedNode.perStage.length > 1}
{#each selectedNode.perStage as row (row.stage)}
<div class="flex items-center gap-2.5 py-1.5 border-t border-default text-xs">
<span class="font-semibold text-text min-w-[72px]">{flowStageLabel(row.stage)}</span>
<span class="ml-auto text-dim tabular-nums">
대기 <strong class="text-text">{row.pending.toLocaleString()}</strong>
· {formatRate(row.done_1h)}/h · 오늘 {row.done_today.toLocaleString()}
{#if row.failed > 0}· <span class="text-error font-semibold">실패 {row.failed}</span>{/if}
</span>
</div>
{/each}
{/if}
<div class="text-[11px] text-dim border-t border-dashed border-default mt-2 pt-2 tabular-nums">
{#if selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600}
가장 오래 기다린 항목 {formatAgeSec(selectedNode.oldestAgeSec)}
{/if}
{#each nodeCurrent(selectedNode.def) as c, i (c.document_id + c.stage)}
{i === 0 && !(selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600) ? '' : ' · '}지금: {c.title} ({flowStageLabel(c.stage)})
{/each}
{#if selectedNode.failed > 0}
· <button class="text-error font-semibold cursor-pointer hover:underline" onclick={openFailures}>실패 {selectedNode.failed} 처리</button>
{/if}
</div>
</div>
</div>
{/if}
<!-- 실패 처리 드로어 -->
{#if failOpen}
<div class="border border-error/40 rounded-card mt-3 overflow-hidden bg-surface">
<div class="flex items-center gap-2.5 px-4 py-2.5 bg-error/5 text-xs font-bold text-text">
실패 처리
<span class="text-[10px] font-semibold text-error">영구 실패 {failItems.length}건 — 자동 재시도 3회 소진, 수동 조치 대기</span>
<button class="ml-auto text-[11px] text-dim font-normal cursor-pointer hover:text-text" onclick={() => (failOpen = false)}>닫기</button>
</div>
{#if failLoading}
<p class="text-xs text-dim text-center py-4">불러오는 중…</p>
{:else if failItems.length === 0}
<p class="text-xs text-dim text-center py-4">영구 실패 항목 없음</p>
{:else}
{#each failGroups as g (g.key)}
<div class="px-4 py-2.5 border-t border-default">
<div class="flex items-center gap-2 flex-wrap text-xs font-bold text-text mb-1">
{flowStageLabel(g.stage)} {g.items.length}
<span class="text-[10px] font-mono font-medium text-error bg-error/10 rounded px-1.5 py-px">{g.pattern}{g.items[0]?.error_message && g.items[0].error_message.length > 36 ? '…' : ''}</span>
</div>
{#each expanded[g.key] ? g.items : g.items.slice(0, 4) as it (it.id)}
<div class="flex items-center gap-2.5 py-1 border-t border-dashed border-default/60 text-xs">
<span class="flex-1 min-w-0 truncate text-text" title={it.title}>{it.title}</span>
<span class="text-[10px] font-mono text-faint shrink-0 tabular-nums">시도 {it.attempts}/{it.max_attempts}</span>
<span class="text-[10px] font-mono text-error shrink-0 max-w-[260px] truncate" title={it.error_message ?? ''}>{it.error_message ?? ''}</span>
<button class="text-[10px] font-bold border border-accent text-accent rounded px-2 py-0.5 shrink-0 cursor-pointer hover:bg-accent/10 disabled:opacity-40" disabled={busy} onclick={() => retryIds([it.id])}>재시도</button>
<button class="text-[10px] font-bold border border-default text-faint rounded px-2 py-0.5 shrink-0 cursor-pointer hover:bg-surface-hover disabled:opacity-40" disabled={busy} onclick={() => skipIds([it.id])}>건너뛰기</button>
</div>
{/each}
{#if g.items.length > 4 && !expanded[g.key]}
<button class="text-[10px] text-dim cursor-pointer hover:text-text mt-1" onclick={() => (expanded = { ...expanded, [g.key]: true })}> {g.items.length - 4} 펼치기</button>
{/if}
{#if g.items.length > 1}
<div class="flex gap-2 mt-1.5">
<button class="text-[10px] font-bold border border-accent text-accent rounded px-2.5 py-0.5 cursor-pointer hover:bg-accent/10 disabled:opacity-40" disabled={busy} onclick={() => retryIds(g.items.map((x) => x.id))}>그룹 전체 재시도 ({g.items.length})</button>
<button class="text-[10px] font-bold border border-default text-faint rounded px-2.5 py-0.5 cursor-pointer hover:bg-surface-hover disabled:opacity-40" disabled={busy} onclick={() => skipIds(g.items.map((x) => x.id))}>그룹 전체 건너뛰기</button>
</div>
{/if}
</div>
{/each}
<p class="text-[10px] text-faint px-4 py-2 border-t border-default">
재시도 = 시도 횟수 리셋 후 큐 재진입 (자동 재시도 3회 새로 부여) · 건너뛰기 = 이 단계 완료 처리(후속 단계 연쇄 없음, 감사 마킹) · 같은 오류가 반복되는 항목(빈 텍스트 등)은 건너뛰기 권장
</p>
{/if}
</div>
{/if}
</div>
<style>
/* 머신 색 — 디자인 토큰 외 3색 (gpu 청/macmini 보라/macbook 황) — 이 컴포넌트 한정 */
.mtag-gpu { background: #e7eef6; color: #3b6ea5; }
.mtag-macmini { background: #efe9f7; color: #8a5fbf; }
.mtag-macbook { background: #f7eedd; color: #b07a10; }
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
.detail-frame { border-color: #3b6ea5; }
.detail-head { background: #e7eef6; }
</style>
@@ -0,0 +1,31 @@
<!--
EidEvidenceCard — 이드 채팅 deep(검색) 답변의 근거 카드 (ds-eid-ask-absorb P1).
ReactResult.sources = {id, doc_id, title, score} (citation 번호 n 없음 — /ask 의 Citation 과
다름) → 순서 기반 번호([1],[2]...). 1단계 카드 = 제목·출처·점수 (스니펫은 react_loop
_result_payload items_src 에 없음 — 2단계 후보). 접이식 <details> 로 채팅 흐름 보존.
디자인 토큰만 (CLAUDE.md lint:tokens).
-->
<script lang="ts">
type EidSource = { id?: number; doc_id?: number; title?: string; score?: number };
let { sources, partial = false }: { sources: EidSource[]; partial?: boolean } = $props();
</script>
{#if sources.length}
<details class="mt-2 rounded-lg border border-default bg-surface text-xs max-w-[85%] sm:max-w-[75%]">
<summary class="cursor-pointer px-3 py-2 text-dim hover:text-text select-none font-semibold">
근거 {sources.length}{partial ? ' · 부분 답변 (확정 근거 부족)' : ''}
</summary>
<ul class="px-3 pb-2.5 flex flex-col gap-1.5">
{#each sources as src, i (src.id ?? i)}
<li class="flex items-start gap-2">
<span class="text-accent font-bold shrink-0">[{i + 1}]</span>
<span class="flex-1 min-w-0 text-text break-words">{src.title || `문서 ${src.doc_id ?? '?'}`}</span>
{#if typeof src.score === 'number'}
<span class="text-faint shrink-0 tabular-nums">{src.score.toFixed(2)}</span>
{/if}
</li>
{/each}
</ul>
</details>
{/if}
+34
View File
@@ -61,6 +61,10 @@ export interface QueueStageRow {
pending: number;
processing: number;
failed: number;
/** 최근 1시간 완료 — 노드 처리율·ETA 재료 (ds-board-engines-1) */
done_1h: number;
/** 최근 1시간 유입 — 유입 우세 판정 재료 (ds-board-engines-1) */
created_1h: number;
done_today: number;
oldest_pending_age_sec: number | null;
}
@@ -72,3 +76,33 @@ export interface QueueOverview {
stages: QueueStageRow[];
totals: QueueTotals;
}
/** ─── 실패 처리 (ds-board-engines-1) — GET /api/queue/failed · POST /retry|/skip ─── */
export interface FailedItem {
id: number;
stage: string;
document_id: number;
title: string;
attempts: number;
max_attempts: number;
error_message: string | null;
failed_at: string | null;
}
export interface FailedListResponse {
items: FailedItem[];
total: number;
}
export interface RetryResponse {
requested: number;
retried: number;
not_retried: number;
}
export interface SkipResponse {
requested: number;
skipped: number;
not_skipped: number;
}
+79
View File
@@ -36,3 +36,82 @@ export function etaPhrase(minutes: number): string {
const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
return `${text}시간 후 소진 예상`;
}
/** ETA 분 → 칩용 짧은 표기 ("약 4.6시간" / "약 12분") */
export function etaShort(minutes: number): string {
if (minutes < 60) return `${Math.max(1, Math.round(minutes))}`;
const hours = minutes / 60;
const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
return `${text}시간`;
}
/** 경과 초 → "N분 전 / N시간 전 / N일 전" */
export function formatAgeSec(sec: number): string {
if (sec < 3600) return `${Math.max(1, Math.round(sec / 60))}분 전`;
if (sec < 86400) return `${Math.round(sec / 3600)}시간 전`;
return `${Math.round(sec / 86400)}일 전`;
}
/* (plan ds-board-engines-1)
* stage / () / . API label
* (raw ), · .
* / 1 (: 맥미니 ).
*/
export type FlowMachine = 'gpu' | 'macmini' | 'macbook';
export interface FlowNodeDef {
key: string;
/** 노드 표시명 */
label: string;
/** 합산할 stage 키 (다중 = 같은 엔진 공유) */
stages: string[];
machine: FlowMachine;
/** 엔진/모델 표시명 (FE 정적 — 모델 교체 시 여기 수정) */
engine: string;
/** 보조 표기 (서비스/워커명) */
sub: string;
}
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
export const FLOW_NODES: FlowNodeDef[] = [
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' },
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' },
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' },
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
];
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
export const AUX_NODES: FlowNodeDef[] = [
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' },
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' },
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' },
];
/** 머신 스트립 메타 — 모델 표기 단일 지점 */
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
gpu: { label: 'GPU 서버', model: '특화 엔진' },
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' },
macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
};
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
export const FLOW_STAGE_LABEL: Record<string, string> = {
extract: '추출',
classify: '분류',
summarize: '요약',
embed: '임베딩',
chunk: '청크',
preview: '미리보기',
stt: '전사',
thumbnail: '썸네일',
deep_summary: '심층분석',
markdown: '마크다운',
fulltext: '전문',
};
export function flowStageLabel(stage: string): string {
return FLOW_STAGE_LABEL[stage] ?? stage;
}
+3 -61
View File
@@ -14,9 +14,7 @@
import { user } from '$lib/stores/auth';
import { api } from '$lib/api';
import { queueOverview, refreshQueueOverview } from '$lib/stores/queueOverview';
import {
MACHINE_STATE_LABEL, machineChipClass, machineDotClass, formatRate, etaPhrase,
} from '$lib/utils/queueDisplay';
import ProcessingFlowBoard from '$lib/components/ProcessingFlowBoard.svelte';
import type { QueueOverview } from '$lib/types/queue';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import Skeleton from '$lib/components/ui/Skeleton.svelte';
@@ -460,65 +458,9 @@
</div>
</div>
<!-- ═══ 처리 머신 보드 (안2) + ETA 라인 (안5 라이트) ═══ -->
<!-- ═══ 처리 머신 보드 v2 — 파이프라인 흐름 + 상세 패널 + 실패 드로어 (ds-board-engines-1) ═══ -->
{#if queue}
<div class="mt-5">
<div class="text-[11px] font-bold text-dim uppercase tracking-wider mb-3">처리 머신</div>
<div class="grid grid-cols-1 md:grid-cols-3 gap-3">
{#each queue.machines as m (m.key)}
<div class="bg-surface border border-default rounded-card p-4">
<!-- 헤더: 상태 dot + 라벨 + state 칩 -->
<div class="flex items-center justify-between gap-2 mb-2">
<span class="flex items-center gap-2 text-[13px] font-bold text-text min-w-0">
<span class="w-2 h-2 rounded-full shrink-0 {machineDotClass(m.state)}"></span>
<span class="truncate">{m.label}</span>
</span>
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(m.state)}">
{MACHINE_STATE_LABEL[m.state]}
</span>
</div>
<!-- 담당 단계 칩 -->
{#if m.stages.length > 0}
<div class="flex flex-wrap gap-1 mb-2.5">
{#each m.stages as s (s)}
<span class="text-[10px] font-semibold rounded-full px-2 py-0.5 bg-surface-hover text-dim">{queueStageLabel(s)}</span>
{/each}
</div>
{/if}
<!-- 대기 · 처리율 · 오늘 -->
<div class="text-xs text-dim tabular-nums">
대기 <strong class="text-text">{m.pending.toLocaleString()}</strong>
· 처리율 <strong class="text-text">{formatRate(m.done_1h)}/h</strong>
· 오늘 <strong class="text-text">{m.done_today.toLocaleString()}</strong>
</div>
<!-- 맥북 보류 (sleep 등 자동 재개 대기) -->
{#if m.key === 'macbook' && m.deferred_pending > 0}
<div class="text-[11px] font-semibold text-warning mt-1.5 tabular-nums">보류 {m.deferred_pending.toLocaleString()}건 — 자동 재개 대기</div>
{/if}
<!-- 지금 처리 중인 문서 -->
{#if m.current.length > 0}
<div class="text-[11px] text-dim border-t border-dashed border-default mt-2.5 pt-2 truncate"
title={m.current.map((c) => `${c.title} (${queueStageLabel(c.stage)})`).join(' · ')}>
지금: {m.current[0].title} ({queueStageLabel(m.current[0].stage)}){m.current.length > 1 ? ` 외 ${m.current.length - 1}` : ''}
</div>
{/if}
</div>
{/each}
</div>
<!-- ETA 한 줄 (안5 라이트 — 추정치) -->
<div class="text-xs text-dim mt-2.5 px-1 tabular-nums"
title="현재 페이스 기반 추정치 — 유입 변동 시 달라질 수 있습니다">
요약 대기 <strong class="text-text">{queue.summarize_eta.pending.toLocaleString()}</strong>
— 소화 {formatRate(queue.summarize_eta.done_rate_1h)}/h
· 유입 {formatRate(queue.summarize_eta.inflow_rate_1h)}/h
{#if queue.summarize_eta.eta_minutes != null}
· <span class="text-accent font-semibold">{etaPhrase(queue.summarize_eta.eta_minutes)}</span>
{:else}
· 유입 우세(백필 중)
{/if}
</div>
</div>
<ProcessingFlowBoard overview={queue} />
{/if}
<!-- ═══ 단계 상세 (기존 stage 테이블 — 접힘 강등, 실패 있을 때 자동 펼침) ═══ -->
+194 -10
View File
@@ -17,6 +17,12 @@
macbook_unavailable / substrate_degraded / 기타 detail). 자동 fallback
금지 — 다른 모드로 자동 전환하지 않는다. 스트림 도중 중단 = 받은 부분
유지 + 표시.
- 대기 표시(첫 바이트 전): 경과 타이머 1초 갱신 + 3초 후 GET /api/eid/status
1회·이후 8초 간격 재조회(실패는 조용히 무시 — 기능 비차단)로 "대기"와
"고장"을 정직하게 구분. daily.busy=true 면 줄 서는 중 안내. 15초 경과 +
daily 모드면 [심층으로 전환]/[취소] 버튼 노출 — 전환은 명시 클릭만
(자동 fallback 금지 정책 위반 아님). 첫 바이트 도착/스트림 종료 시
타이머·폴링 즉시 정리.
- 이력: localStorage `eid_chat:v1` (키 상수는 $lib/eidChat — logout 시 제거와 공유).
전송 payload 는 마지막 20턴(40 messages) cap.
- 입력 한도: 메시지당 8,000자 클라 선차단(서버 422 검증과 동일 한도).
@@ -25,15 +31,25 @@
-->
<script lang="ts">
import { onMount, onDestroy } from 'svelte';
import { apiFetchRaw } from '$lib/api';
import { api, apiFetchRaw } from '$lib/api';
import { EID_CHAT_STORAGE_KEY } from '$lib/eidChat';
import Button from '$lib/components/ui/Button.svelte';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import EidEvidenceCard from '$lib/components/eid/EidEvidenceCard.svelte';
import { MessageCircle, SendHorizontal, RotateCcw, AlertCircle } from 'lucide-svelte';
type ChatMode = 'daily' | 'deep';
type ChatMessage = { role: 'user' | 'assistant'; content: string };
// deep(검색) 답변은 sources(근거)·partial 동반. daily 답변은 없음.
type EidSource = { id?: number; doc_id?: number; title?: string; score?: number };
type ChatMessage = {
role: 'user' | 'assistant';
content: string;
sources?: EidSource[];
partial?: boolean;
};
type Notice = { kind: 'warn' | 'error'; message: string; retryable: boolean };
// GET /api/eid/status 응답 — 대기 중 바쁨 신호 조회에 필요한 필드만 좁게 정의
type EidStatus = { daily?: { busy?: boolean } };
// 이력 키 — logout(stores/auth.ts) 의 이력 제거와 단일 상수 공유
const STORAGE_KEY = EID_CHAT_STORAGE_KEY;
@@ -45,6 +61,10 @@
const MAX_MESSAGE_CHARS = 8000;
// 한도 근접 카운터 노출 시작점
const COUNTER_THRESHOLD = 7500;
// 대기 표시(첫 바이트 전): 상태 폴링 시작 시점(초) / 재조회 간격(초) / 행동 버튼 노출 시점(초)
const STATUS_POLL_START_SEC = 3;
const STATUS_POLL_INTERVAL_SEC = 8;
const WAIT_ACTIONS_SEC = 15;
const DEEP_CAPTION =
'장문·무거운 질문에 적합 — 잠들어 있으면 자동 기동 (처음 응답까지 최대 ~1분)';
@@ -64,11 +84,72 @@
let streaming = $state(false);
let streamingText = $state('');
let notice = $state<Notice | null>(null);
// deep(검색) 모드 첫 바이트 전 단계 — 'searching' 이면 대기 표시를 "근거 검색 중"으로
let deepPhase = $state<'searching' | null>(null);
let scrollEl: HTMLDivElement | undefined = $state();
let textareaEl: HTMLTextAreaElement | undefined = $state();
let abortCtrl: AbortController | null = null;
// ── 대기 추적 (첫 바이트 전) ────────────────────────
// 경과 초 + daily 엔진 바쁨 여부(null = 미확인). 토큰(세대 카운터)으로
// 스트림별 소유를 구분 — abort 직후 즉시 재전송(심층 전환) 경로에서
// 이전 스트림의 늦은 정리가 새 스트림의 타이머를 죽이지 않게 한다.
let waitSeconds = $state(0);
let dailyBusy = $state<boolean | null>(null);
let waitIntervalId: ReturnType<typeof setInterval> | null = null;
let waitTokenSeq = 0;
let waitToken = 0; // 현재 활성 추적 토큰 (0 = 추적 없음)
function startWaitTracking(streamMode: ChatMode): number {
// 이전 추적 잔여 정리 (전환 재전송처럼 stop 전에 start 가 오는 경로 방어)
if (waitIntervalId !== null) {
clearInterval(waitIntervalId);
waitIntervalId = null;
}
const token = ++waitTokenSeq;
waitToken = token;
waitSeconds = 0;
dailyBusy = null;
waitIntervalId = setInterval(() => {
if (waitToken !== token) return; // 정리 누락 방어 — 무해 no-op
waitSeconds += 1;
// 바쁨 신호 폴링: 3초 경과 시 1회 + 이후 8초 간격 (3, 11, 19, ...).
// daily 모드 전용 — deep 대기는 기존 wake 안내 + 경과 타이머만.
if (
streamMode === 'daily' &&
waitSeconds >= STATUS_POLL_START_SEC &&
(waitSeconds - STATUS_POLL_START_SEC) % STATUS_POLL_INTERVAL_SEC === 0
) {
void pollEidStatus(token);
}
}, 1000);
return token;
}
// token 가드: 본인 소유 추적만 정리 — 다른 스트림이 이어받았으면 no-op
function stopWaitTracking(token: number) {
if (token !== waitToken) return;
waitToken = 0;
if (waitIntervalId !== null) {
clearInterval(waitIntervalId);
waitIntervalId = null;
}
waitSeconds = 0;
dailyBusy = null;
}
// 상태 조회 — 실패는 조용히 무시 (대기 표시는 타이머만으로 유지, 기능 비차단)
async function pollEidStatus(token: number) {
try {
const status = await api<EidStatus>('/eid/status');
if (token !== waitToken) return; // 스트림 종료/교체 후 도착한 늦은 응답 폐기
dailyBusy = status?.daily?.busy === true;
} catch {
// 무시 — 바쁨 신호는 부가 정보일 뿐 채팅 기능을 차단하지 않는다
}
}
// ── localStorage 이력 ───────────────────────────────
function persist() {
if (typeof window === 'undefined') return;
@@ -97,9 +178,15 @@
typeof (m as ChatMessage).content === 'string'
)
// 배열 크기 가드 + content 8,000자 clamp — 외부에서 손상/비대해진
// 이력이 전송 payload 를 오염시키지 않도록 복원 시점에 정규화
// 이력이 전송 payload 를 오염시키지 않도록 복원 시점에 정규화.
// sources/partial(deep 답변 근거)은 보존 — 전송 payload 엔 안 실림(runStream map 이 role/content 만).
.slice(-MAX_STORED_MESSAGES)
.map((m) => ({ role: m.role, content: m.content.slice(0, MAX_MESSAGE_CHARS) }));
.map((m) => ({
role: m.role,
content: m.content.slice(0, MAX_MESSAGE_CHARS),
sources: Array.isArray((m as ChatMessage).sources) ? (m as ChatMessage).sources : undefined,
partial: (m as ChatMessage).partial === true || undefined,
}));
}
} catch {
// 손상된 이력은 무시 (새 대화로 시작)
@@ -107,7 +194,11 @@
}
onMount(() => restore());
onDestroy(() => abortCtrl?.abort());
onDestroy(() => {
abortCtrl?.abort();
// 페이지 이탈 시 대기 타이머/폴링 정리 (abort 의 finally 와 이중이어도 무해)
if (waitIntervalId !== null) clearInterval(waitIntervalId);
});
// ── 자동 스크롤 (새 메시지 / 스트림 청크마다 하단 고정) ──
$effect(() => {
@@ -235,12 +326,39 @@
void runStream();
}
// ── 대기 중 행동 버튼 (daily + 15초 경과) ────────────
// [심층으로 전환] — 명시 클릭에 의한 모드 전환 (자동 fallback 금지 정책
// 위반 아님). 현재 fetch abort → 같은 user 턴을 mode=deep 으로 즉시 재전송.
// abort 된 이전 스트림의 finally 는 abortCtrl 비교 + 대기 token 가드로
// 새 스트림 상태를 건드리지 않는다 (새 대화 abort race 가드와 동일 구조).
function switchToDeep() {
if (!streaming || mode !== 'daily') return;
mode = 'deep'; // 모드 토글 상태도 deep 으로 갱신
abortCtrl?.abort();
void runStream();
}
// [취소] — abort 후 방금 push 한 user 턴 pop + 입력창 본문 복원
// (422 처리와 동일 패턴: 이력 오염 차단 + localStorage 재저장).
// placeholder 제거는 abort 된 스트림의 finally(streaming=false)가 처리.
function cancelWait() {
if (!streaming) return;
abortCtrl?.abort();
if (messages.length > 0 && messages[messages.length - 1].role === 'user') {
const popped = messages.pop();
if (popped && !input) input = popped.content;
persist();
}
}
async function runStream() {
notice = null;
streaming = true;
streamingText = '';
const ctrl = new AbortController();
abortCtrl = ctrl;
// 첫 바이트 전 대기 추적 시작 — 본 스트림 소유 토큰으로 정리 시점 제어
const waitTok = startWaitTracking(mode);
const payload = {
mode,
@@ -251,6 +369,9 @@
let acc = '';
let sawDone = false;
// deep(검색) 답변 동반 데이터 — daily 는 안 옴
let accSources: EidSource[] = [];
let accPartial = false;
try {
const res = await apiFetchRaw('/eid/chat', {
@@ -301,9 +422,35 @@
try {
const obj = JSON.parse(data) as {
choices?: Array<{ delta?: { content?: unknown } }>;
phase?: string;
error_reason?: string;
eid_sources?: EidSource[];
partial?: boolean;
};
// deep(검색) envelope 분기 — daily 응답엔 없음
if (obj?.phase === 'ping') return false; // heartbeat — 무시
if (obj?.phase === 'searching') {
deepPhase = 'searching'; // 대기 표시를 "근거 검색 중"으로
return false;
}
if (obj?.phase === 'error') {
// in-stream 미가용/실패 — 받은 부분 유지 + 명시 표시 (자동 fallback 0).
// 뒤따르는 [DONE] 이 sawDone 처리하므로 '중단' 오경보 없음.
notice = mapErrorReason(obj.error_reason, '');
return false;
}
if (Array.isArray(obj?.eid_sources)) {
accSources = obj.eid_sources;
accPartial = obj.partial === true;
return false;
}
const piece = obj?.choices?.[0]?.delta?.content;
if (typeof piece === 'string' && piece) {
// 첫 바이트 도착 — 대기 타이머/폴링 제거, 기존 스트리밍 표시로 전환
if (!acc) {
stopWaitTracking(waitTok);
deepPhase = null;
}
acc += piece;
streamingText = acc;
}
@@ -356,7 +503,7 @@
}
} catch (err) {
if ((err as Error)?.name === 'AbortError') {
// 새 대화 등 사용자 의도 중단 — 안내 불필요
// 새 대화 / 대기 취소 / 심층 전환 등 사용자 의도 중단 — 안내 불필요
return;
}
// 스트림 도중 네트워크 에러 — 받은 부분 유지 + 표시
@@ -368,14 +515,23 @@
}
: { kind: 'error', message: '요청에 실패했습니다 — 네트워크를 확인하세요.', retryable: true };
} finally {
// 스트림 종료 — 대기 타이머/폴링 정리. 첫 바이트에서 이미 정리됐거나
// 전환 재전송으로 새 스트림이 추적을 이어받았으면 token 가드로 no-op.
stopWaitTracking(waitTok);
// abort(새 대화/페이지 이탈) 시에는 push 하지 않음 — 새 대화로 비운
// messages 에 이전 스트림 잔여분이 흘러들어가는 race 방지.
if (acc && !ctrl.signal.aborted) {
messages.push({ role: 'assistant', content: acc });
messages.push({
role: 'assistant',
content: acc,
sources: accSources.length ? accSources : undefined,
partial: accPartial || undefined,
});
}
if (abortCtrl === ctrl) {
streaming = false;
streamingText = '';
deepPhase = null;
abortCtrl = null;
}
persist();
@@ -398,6 +554,24 @@
// 입력 길이(전송 기준 = trim 후) — 7,500자부터 카운터 노출, 8,000자 초과 차단
let inputLength = $derived(input.trim().length);
let overLimit = $derived(inputLength > MAX_MESSAGE_CHARS);
// 첫 바이트 전 placeholder 문구 — "대기"와 "고장"의 정직한 구분:
// 바쁨 확인 = 줄 서는 중 / 비-바쁨 확인 = 생성 준비 중 / 미확인 = 응답 대기 중.
// deep 모드는 폴링하지 않으므로 항상 미확인(타이머만) — wake 안내는 헤더 caption.
let waitPlaceholder = $derived(
deepPhase === 'searching'
? `이드가 문서·뉴스에서 근거를 찾는 중 · ${waitSeconds}초`
: dailyBusy === true
? `엔진이 다른 작업을 처리하고 있어요 — 차례가 오면 바로 시작됩니다 (대기 ${waitSeconds}초)`
: dailyBusy === false
? `응답 생성 준비 중 · ${waitSeconds}초`
: `응답 대기 중 · ${waitSeconds}초`
);
// 행동 버튼 노출: daily 모드 + 첫 바이트 전 + 15초 경과
let showWaitActions = $derived(
streaming && !streamingText && mode === 'daily' && waitSeconds >= WAIT_ACTIONS_SEC
);
</script>
<svelte:head>
@@ -473,25 +647,35 @@
</div>
</div>
{:else}
<div class="flex justify-start">
<div class="flex flex-col items-start">
<div class="max-w-[85%] sm:max-w-[75%] px-3.5 py-2.5 rounded-lg rounded-bl-sm bg-surface border border-default text-text text-sm whitespace-pre-wrap break-words">
{msg.content}
</div>
{#if msg.sources?.length}
<EidEvidenceCard sources={msg.sources} partial={msg.partial ?? false} />
{/if}
</div>
{/if}
{/each}
<!-- 스트리밍 중 assistant 부분 응답 -->
<!-- 스트리밍 중 assistant 부분 응답 / 첫 바이트 전 대기 표시 -->
{#if streaming}
<div class="flex justify-start">
<div class="max-w-[85%] sm:max-w-[75%] px-3.5 py-2.5 rounded-lg rounded-bl-sm bg-surface border border-default text-text text-sm whitespace-pre-wrap break-words">
{#if streamingText}
{streamingText}<span class="inline-block w-1.5 h-3.5 ml-0.5 align-middle bg-accent animate-pulse rounded-sm"></span>
{:else}
<span class="text-dim animate-pulse">응답 준비 중...</span>
<span class="text-dim animate-pulse">{waitPlaceholder}</span>
{/if}
</div>
</div>
<!-- 대기 행동 버튼: daily + 15초 경과 — 전환은 명시 클릭만 (자동 fallback 금지) -->
{#if showWaitActions}
<div class="flex justify-start gap-2">
<Button variant="secondary" size="sm" onclick={switchToDeep}>심층으로 전환</Button>
<Button variant="ghost" size="sm" onclick={cancelWait}>취소</Button>
</div>
{/if}
{/if}
<!-- 에러/안내 카드: 자동 fallback 없이 명시 표시만 -->
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen06 (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen06 (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen4 (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(2560) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4 (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(2560) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen4m (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4m (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+152
View File
@@ -0,0 +1,152 @@
"""POST /api/eid/chat mode=deep — ReAct 자동검색 SSE 변환 (ds-eid-ask-absorb P1).
DB·LLM 0: get_session/get_current_user dependency override, probe·agentic_ask_loop·
get_backend monkeypatch. 실제 검색·27B 호출 없음.
검증: 검색성phase:searching+content+eid_sources+DONE / probe 실패503 /
mid-stream BackendUnavailablein-stream error envelope / 대화성sources .
"""
from __future__ import annotations
import json
import sys
import types
from pathlib import Path
import pytest
import pytest_asyncio
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "app"))
import api.eid_chat as eid_chat # noqa: E402
from api.eid_chat import router as eid_chat_router # noqa: E402
from core.auth import get_current_user # noqa: E402
from core.database import get_session # noqa: E402
from services.llm.backends import BackendUnavailable # noqa: E402
from services.search.react_loop import ReactResult # noqa: E402
_DEEP = {"mode": "deep", "messages": [{"role": "user", "content": "콜드박스 위험성평가 찾아줘"}]}
async def _async_true() -> bool:
return True
async def _async_false() -> bool:
return False
def _build_app() -> FastAPI:
app = FastAPI()
app.include_router(eid_chat_router, prefix="/api/eid")
app.dependency_overrides[get_current_user] = lambda: types.SimpleNamespace(
id=1, username="test-user"
)
async def _fake_session():
yield None # deep 경로는 session 을 agentic_ask_loop 에 넘기기만(여기선 monkeypatch)
app.dependency_overrides[get_session] = _fake_session
return app
def _data_objs(raw: bytes) -> list[dict]:
out: list[dict] = []
for line in raw.split(b"\n"):
if line.startswith(b"data: ") and line[len(b"data: "):].strip() != b"[DONE]":
try:
out.append(json.loads(line[len(b"data: "):]))
except Exception:
pass
return out
@pytest_asyncio.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=_build_app()), base_url="http://test"
) as ac:
yield ac
@pytest.fixture(autouse=True)
def _rules_present(monkeypatch):
# D-6 fail-closed 가드 통과 (substrate degraded 아님)
monkeypatch.setattr(eid_chat.eid_compose, "rules_present", lambda: True)
@pytest.mark.asyncio
async def test_deep_search_sse_shape(client, monkeypatch):
"""검색성 질문 → phase:searching + final content + eid_sources + DONE 순서."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
async def _fake_loop(session, query, *, backend, **kw):
return ReactResult(
final_answer="콜드박스 위험성평가는 TK-RA-2026-OT1-01 입니다.",
iterations=1,
partial=False,
sources=[{"id": 1, "doc_id": 10, "title": "OT1 콜드박스 위험성평가", "score": 0.91}],
)
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _fake_loop)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 200
objs = _data_objs(r.content)
assert "searching" in [o.get("phase") for o in objs if "phase" in o]
content = "".join(
o["choices"][0]["delta"]["content"] for o in objs if "choices" in o
)
assert "OT1-01" in content
srcs = [o["eid_sources"] for o in objs if "eid_sources" in o]
assert srcs and srcs[0][0]["title"] == "OT1 콜드박스 위험성평가"
assert b"data: [DONE]" in r.content
@pytest.mark.asyncio
async def test_deep_conversational_no_sources(client, monkeypatch):
"""대화성(검색 불요) → ReAct early-exit, sources 빈 배열."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
async def _chat_loop(session, query, *, backend, **kw):
return ReactResult(final_answer="안녕하세요, 이드입니다.", iterations=1, partial=False, sources=[])
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _chat_loop)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 200
objs = _data_objs(r.content)
srcs = [o["eid_sources"] for o in objs if "eid_sources" in o]
assert srcs and srcs[0] == [] # 검색 안 함 = 근거 카드 안 뜸
@pytest.mark.asyncio
async def test_deep_probe_fail_503(client, monkeypatch):
"""probe 실패(router 미도달) → 첫 바이트 전 503 router_unreachable."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_false)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 503
assert r.json()["error_reason"] == "router_unreachable"
@pytest.mark.asyncio
async def test_deep_midstream_error_envelope(client, monkeypatch):
"""검색 중 BackendUnavailable(AC 분리 등) → in-stream error envelope + DONE."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_true)
monkeypatch.setattr(eid_chat, "get_backend", lambda name: object())
async def _fail_loop(session, query, *, backend, **kw):
raise BackendUnavailable("qwen-macbook", "macbook_unavailable")
monkeypatch.setattr(eid_chat, "agentic_ask_loop", _fail_loop)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 200 # 스트림 이미 시작(probe 통과) → 200 + in-stream error
objs = _data_objs(r.content)
errs = [o for o in objs if o.get("phase") == "error"]
assert errs and errs[0]["error_reason"] == "macbook_unavailable"
assert b"data: [DONE]" in r.content
+6 -2
View File
@@ -131,6 +131,8 @@ async def test_503_substrate_degraded(client, monkeypatch):
@pytest.mark.asyncio
async def test_503_backend_unavailable_prestream(client, monkeypatch):
# call_stream 회귀(prestream 503)는 daily 로 검증 — deep 은 이제 ReAct 별 경로
# (probe·agentic_ask_loop), deep 의 503/midstream 은 test_eid_chat_deep.py 가 커버.
async def fake_call_stream(self, mode, messages, system):
raise BackendUnavailable("qwen-macbook", "macbook_unavailable")
yield b"" # pragma: no cover — async generator 형태 유지용
@@ -138,7 +140,7 @@ async def test_503_backend_unavailable_prestream(client, monkeypatch):
monkeypatch.setattr(EidAIClient, "call_stream", fake_call_stream)
r = await client.post(
"/api/eid/chat",
json={"mode": "deep", "messages": [{"role": "user", "content": "x"}]},
json={"mode": "daily", "messages": [{"role": "user", "content": "x"}]},
)
assert r.status_code == 503
js = r.json()
@@ -192,9 +194,11 @@ async def test_200_midstream_abort_quiet(client, monkeypatch):
raise BackendUnavailable("qwen-macbook", "stream_deadline_exceeded")
monkeypatch.setattr(EidAIClient, "call_stream", fake_call_stream)
# call_stream midstream 회귀는 daily 로 — deep midstream 은 in-stream error envelope
# 경로(test_eid_chat_deep.test_deep_midstream_error_envelope)로 분리됨.
r = await client.post(
"/api/eid/chat",
json={"mode": "deep", "messages": [{"role": "user", "content": "x"}]},
json={"mode": "daily", "messages": [{"role": "user", "content": "x"}]},
)
assert r.status_code == 200
assert r.content == b'data: {"x": 1}\n\n'
+5 -5
View File
@@ -104,7 +104,7 @@ async def test_anthropic_router_url_blocked(monkeypatch):
@pytest.mark.asyncio
async def test_deep_mode_alias_and_sse_line_rewrite(monkeypatch):
"""deep → qwen-macbook alias, system 은 messages[0] 단일 주입, 라인 단위 정화 중계."""
"""deep → mac-mini-default alias (맥북 백지화 2026-06-11), system 은 messages[0] 단일 주입, 라인 단위 정화 중계."""
seen: dict = {}
def handler(request: httpx.Request) -> httpx.Response:
@@ -139,7 +139,7 @@ async def test_deep_mode_alias_and_sse_line_rewrite(monkeypatch):
]
assert seen["url"].endswith("/v1/chat/completions")
body = seen["json"]
assert body["model"] == "qwen-macbook"
assert body["model"] == "mac-mini-default"
assert body["stream"] is True
assert body["max_tokens"] == 2048
assert body["temperature"] == 0.4
@@ -202,7 +202,7 @@ async def test_prestream_503_maps_reason(monkeypatch):
with pytest.raises(BackendUnavailable) as ei:
await anext(stream)
assert ei.value.reason == "macbook_unavailable"
assert ei.value.backend_name == "qwen-macbook"
assert ei.value.backend_name == "mac-mini-default"
finally:
await c.close()
@@ -253,7 +253,7 @@ async def test_prestream_400_raises_valueerror_failloud(monkeypatch):
c = EidAIClient()
try:
stream = c.call_stream("deep", _MSG, "sys")
with pytest.raises(ValueError, match="router rejected alias='qwen-macbook'"):
with pytest.raises(ValueError, match="router rejected alias='mac-mini-default'"):
await anext(stream)
finally:
await c.close()
@@ -290,7 +290,7 @@ async def test_stream_deadline_exceeded(monkeypatch):
async for _ in stream:
pass
assert ei.value.reason == "stream_deadline_exceeded"
assert ei.value.backend_name == "qwen-macbook"
assert ei.value.backend_name == "mac-mini-default"
finally:
await c.close()
+112
View File
@@ -0,0 +1,112 @@
"""GET /api/eid/status endpoint 테스트 — inline ASGI app (DB 의존 0).
실행 환경: fastapi + httpx 필요 test_eid_chat_endpoint.py 동일 idiom.
DB 0 / LLM 0: get_current_user dependency_overrides 대체, gate 점유는
llm_gate.gate_status monkeypatch (eid_chat 모듈 attribute 호출하므로 유효).
무인증 케이스는 실제 auth 경로지만 decode 단계에서 거부돼 DB 접근 반환.
"""
from __future__ import annotations
import sys
import types
from pathlib import Path
import pytest
import pytest_asyncio
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "app"))
from api.eid_chat import router as eid_chat_router # noqa: E402
from core.auth import get_current_user # noqa: E402
from services.search import llm_gate # noqa: E402
def _build_app(*, override_auth: bool = True) -> FastAPI:
"""main.py 등록 방식과 동일 prefix(/api/eid)로 라우터만 올린 inline app."""
app = FastAPI()
app.include_router(eid_chat_router, prefix="/api/eid")
if override_auth:
app.dependency_overrides[get_current_user] = lambda: types.SimpleNamespace(
id=1, username="test-user"
)
return app
@pytest_asyncio.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=_build_app()), base_url="http://test"
) as ac:
yield ac
# ── 401 무인증 ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_unauthenticated_rejected():
async with AsyncClient(
transport=ASGITransport(app=_build_app(override_auth=False)),
base_url="http://test",
) as ac:
# 헤더 자체 부재 — HTTPBearer 단계 거부 (fastapi 기본 403, 버전별 401 허용)
r = await ac.get("/api/eid/status")
assert r.status_code in (401, 403)
# 위조 토큰 — decode_token 실패 → 401 (DB 접근 전 거부)
r2 = await ac.get(
"/api/eid/status", headers={"Authorization": "Bearer bogus-token"}
)
assert r2.status_code == 401
# ── 200 shape ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_200_shape(client, monkeypatch):
"""응답 shape — daily 키 아래 busy/inflight/waiters 3필드, 타입 고정."""
monkeypatch.setattr(
llm_gate, "gate_status", lambda: {"inflight": False, "waiters": 0}
)
r = await client.get("/api/eid/status")
assert r.status_code == 200, r.text
js = r.json()
assert set(js.keys()) == {"daily"}
assert set(js["daily"].keys()) == {"busy", "inflight", "waiters"}
assert isinstance(js["daily"]["busy"], bool)
assert isinstance(js["daily"]["inflight"], bool)
assert isinstance(js["daily"]["waiters"], int)
# ── busy 판정 — gate_status monkeypatch ──────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.parametrize(
"snap, expected",
[
# 유휴 — busy=false (근사: 외부 소비자 점유는 미포착)
(
{"inflight": False, "waiters": 0},
{"busy": False, "inflight": False, "waiters": 0},
),
# inflight 만 — busy=true (확실)
(
{"inflight": True, "waiters": 0},
{"busy": True, "inflight": True, "waiters": 0},
),
# waiters 만 — busy=true (inflight or waiters>0 의 or 분기)
(
{"inflight": False, "waiters": 3},
{"busy": True, "inflight": False, "waiters": 3},
),
],
)
async def test_busy_from_gate_status(client, monkeypatch, snap, expected):
monkeypatch.setattr(llm_gate, "gate_status", lambda: dict(snap))
r = await client.get("/api/eid/status")
assert r.status_code == 200, r.text
assert r.json() == {"daily": expected}
+26
View File
@@ -0,0 +1,26 @@
{
"_meta": {
"plan": "embedding-phase2a-1 G-1",
"measured_at": "2026-06-12",
"serving": "Ollama 0.20.0 (GPU container `ollama`), endpoint = POST /api/embed (단일 고정 — legacy /api/embeddings 사용 금지)",
"invariant": "저장=조회 동일 모델+버전, 프롬프트는 역할별 고정 (문서=plain / 쿼리=instruct prefix)"
},
"instruct_prefix_pinned": "Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: ",
"models": {
"qwen3-embedding:0.6b": {
"digest": "ac6da0dfba84", "size": "639MB", "dim": 1024, "l2_normalized": true
},
"qwen3-embedding:4b": {
"digest": "df5bd2e3c74c", "size": "2.5GB(Q4)", "dim": 2560, "l2_normalized": true,
"mrl_dimensions_option": {"supported": true, "dimensions=1024": {"dim": 1024, "l2_norm": 1.0, "note": "Ollama 가 truncate+재정규화까지 수행 — 쿼리측 MRL 은 dimensions 옵션으로 처리"}}
}
},
"asymmetric_prefix_effect_0.6b": {
"doc": "압력용기의 수압시험은 설계압력의 1.3배로 실시하며, 시험 중 용접부 누설 여부를 육안으로 확인한다.",
"query": "압력용기 수압시험 기준 압력은?",
"cos_doc_vs_query_plain": 0.7446,
"cos_doc_vs_query_instruct": 0.7606,
"cos_plain_vs_instruct_query": 0.882,
"verdict": "prefix 가 쿼리 임베딩을 실질 변화시키고(0.882) 관련쌍 유사도를 올림(+0.016) — 비대칭 사용 필수"
}
}
+97
View File
@@ -0,0 +1,97 @@
"""Phase 2A E-4 비교기 — baseline vs 후보 run CSV 들의 per-query 판정.
python tests/search_eval/compare_runs.py \
--baseline baselines/<exact 재측정>.csv \
--cand qwen06=<...>.csv --cand qwen4=<...>.csv --cand qwen4m=<...>.csv \
[--epsilon 0.01] [--bootstrap 2000]
판정 출력(plan r3 E-4): 전체 graded NDCG 평균 delta · per-query win/loss/tie(|d|<ε=tie)
· 부트스트랩 95% CI · 카테고리별 평균 · 상위 개선/퇴행 쿼리. failure_expected/error 제외.
"""
from __future__ import annotations
import argparse
import csv
import random
import statistics
from pathlib import Path
def load(path: str) -> dict[str, dict]:
out = {}
with Path(path).open(encoding="utf-8") as f:
for row in csv.DictReader(f):
if row.get("failure_expected", "").lower() in ("true", "1"):
continue
if row.get("error"):
continue
try:
row["_g"] = float(row["graded_ndcg_at_10"])
except (TypeError, ValueError):
continue
out[row["id"]] = row
return out
def bootstrap_ci(deltas: list[float], n: int, seed: int = 42) -> tuple[float, float]:
rng = random.Random(seed)
means = sorted(
statistics.mean(rng.choices(deltas, k=len(deltas))) for _ in range(n)
)
return means[int(0.025 * n)], means[int(0.975 * n)]
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--baseline", required=True)
p.add_argument("--cand", action="append", required=True, metavar="name=csv")
p.add_argument("--epsilon", type=float, default=0.01)
p.add_argument("--bootstrap", type=int, default=2000)
a = p.parse_args()
base = load(a.baseline)
print(f"baseline: {a.baseline} — scored {len(base)}, "
f"graded NDCG mean {statistics.mean(r['_g'] for r in base.values()):.4f}")
for spec in a.cand:
name, path = spec.split("=", 1)
cand = load(path)
ids = sorted(set(base) & set(cand))
if len(ids) != len(base):
print(f"{name}: 교집합 {len(ids)} != baseline {len(base)} — 누락 쿼리 확인")
deltas = [cand[i]["_g"] - base[i]["_g"] for i in ids]
mean_b = statistics.mean(base[i]["_g"] for i in ids)
mean_c = statistics.mean(cand[i]["_g"] for i in ids)
win = sum(1 for d in deltas if d > a.epsilon)
loss = sum(1 for d in deltas if d < -a.epsilon)
tie = len(deltas) - win - loss
lo, hi = bootstrap_ci(deltas, a.bootstrap)
decided = win + loss
win_rate = (win / decided * 100) if decided else 0.0
print(f"\n== {name} ==")
print(f" graded NDCG: {mean_b:.4f}{mean_c:.4f} (delta {mean_c-mean_b:+.4f}, "
f"bootstrap95% [{lo:+.4f}, {hi:+.4f}])")
print(f" per-query: win {win} / loss {loss} / tie {tie} (ε={a.epsilon}) — "
f"win-rate(결정전) {win_rate:.0f}%")
cats: dict[str, list[float]] = {}
for i in ids:
cats.setdefault(base[i].get("category", "?"), []).append(
cand[i]["_g"] - base[i]["_g"]
)
for c in sorted(cats):
ds = cats[c]
cb = statistics.mean(base[i]["_g"] for i in ids if base[i].get("category") == c)
cc = statistics.mean(cand[i]["_g"] for i in ids if base[i].get("category") == c)
print(f" {c:<18} {cb:.3f}{cc:.3f} ({statistics.mean(ds):+.3f}, n={len(ds)})")
ranked = sorted(ids, key=lambda i: cand[i]["_g"] - base[i]["_g"])
worst = [(i, round(cand[i]['_g']-base[i]['_g'],3)) for i in ranked[:3]]
best = [(i, round(cand[i]['_g']-base[i]['_g'],3)) for i in ranked[-3:][::-1]]
print(f" 개선 top3 {best} / 퇴행 top3 {worst}")
if __name__ == "__main__":
main()
+150
View File
@@ -0,0 +1,150 @@
"""2026-06-12 fair-share 번들 — gate capacity 일반화 / call_deep_or_defer cfg / drain classify.
worker-process 레벨(DB 필요) deep 폴백·classify drain 라이브 E2E 검증하고,
여기서는 메커니즘의 seam 단위 검증한다.
"""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
import httpx
import pytest
from core.config import settings
from services.search.llm_gate import Priority, _reset_for_test, acquire_mlx_gate, gate_status
@pytest.fixture(autouse=True)
def _reset_gate(monkeypatch):
monkeypatch.setattr(settings, "mlx_gate_concurrency", 2)
_reset_for_test()
yield
_reset_for_test()
# ─── gate capacity 2 ─────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_two_concurrent_holders_overlap():
"""capacity=2: 두 holder 가 동시에 inflight — 서로를 기다리지 않는다."""
log: list = []
async def hold(label: str):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("in", label))
await asyncio.sleep(0.05)
log.append(("out", label))
await asyncio.gather(hold("a"), hold("b"))
# 둘 다 진입한 뒤에 첫 release 가 나와야 함 (overlap 증명)
assert log[0][0] == "in" and log[1][0] == "in"
@pytest.mark.asyncio
async def test_third_waits_until_slot_frees():
"""capacity=2: 3번째는 대기, 첫 release 후 진입."""
order: list = []
release_a = asyncio.Event()
async def hold(label: str, wait_event: asyncio.Event | None):
async with acquire_mlx_gate(Priority.BACKGROUND):
order.append(("in", label))
if wait_event:
await wait_event.wait()
else:
await asyncio.sleep(0.01)
order.append(("out", label))
t_a = asyncio.create_task(hold("a", release_a))
t_b = asyncio.create_task(hold("b", release_a))
await asyncio.sleep(0.02) # a, b inflight 진입 대기
assert ("in", "a") in order and ("in", "b") in order
assert gate_status()["inflight"] == 2
t_c = asyncio.create_task(hold("c", None))
await asyncio.sleep(0.02)
assert ("in", "c") not in order # 슬롯 2 점유 중 — c 는 대기
assert gate_status()["waiters"] == 1
release_a.set()
await asyncio.gather(t_a, t_b, t_c)
assert ("in", "c") in order
@pytest.mark.asyncio
async def test_capacity_one_serializes():
"""concurrency=1 이면 기존 직렬화 그대로 (무회귀)."""
from core.config import settings as s
s_backup = s.mlx_gate_concurrency
s.mlx_gate_concurrency = 1
try:
_reset_for_test()
log: list = []
async def hold(label: str):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("in", label))
await asyncio.sleep(0.02)
log.append(("out", label))
await asyncio.gather(hold("a"), hold("b"))
# 직렬: in/out 쌍이 겹치지 않음
assert [e[0] for e in log] == ["in", "out", "in", "out"]
finally:
s.mlx_gate_concurrency = s_backup
# ─── call_deep_or_defer cfg override ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_call_deep_or_defer_cfg_override():
"""cfg 지정 시 deep 슬롯 대신 해당 config 로 _request 호출."""
from ai.client import call_deep_or_defer
seen: dict = {}
class FakeClient:
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
async def _request(self, cfg, prompt, system=None):
seen["cfg"] = cfg
return "ok"
async def call_deep(self, prompt, system=None):
seen["cfg"] = self.ai.deep
return "ok"
override = SimpleNamespace(model="deep-endpoint-triage-sampling", temperature=0.0)
out = await call_deep_or_defer(FakeClient(), "p", cfg=override)
assert out == "ok"
assert seen["cfg"] is override
@pytest.mark.asyncio
async def test_call_deep_or_defer_cfg_still_defers():
"""cfg 경로에서도 보류 분류(502/503/TransportError → StageDeferred) 동일 적용."""
from ai.client import call_deep_or_defer
from models.queue import StageDeferred
class FakeClient:
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
async def _request(self, cfg, prompt, system=None):
raise httpx.ConnectError("down")
with pytest.raises(StageDeferred):
await call_deep_or_defer(FakeClient(), "p", cfg=SimpleNamespace(model="x"))
# ─── drain stages ────────────────────────────────────────────────────────────
def test_drain_stages_include_classify():
from workers.queue_drain import DRAIN_STAGES
assert set(DRAIN_STAGES) == {"summarize", "deep_summary", "classify"}
+2 -1
View File
@@ -152,8 +152,9 @@ async def test_drain_requires_deep_slot(monkeypatch):
@pytest.mark.asyncio
async def test_drain_rejects_non_drain_stage(monkeypatch):
"""classify 는 2026-06-12 fair-share 로 DRAIN_STAGES 합류 — 거부 대상은 extract 등."""
import workers.queue_drain as qd
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=object())))
with pytest.raises(SystemExit):
await qd.drain("classify", 1)
await qd.drain("extract", 1)
+96
View File
@@ -0,0 +1,96 @@
"""Phase 2A (embedding-phase2a-1) — Qwen 후보 디스패처/쿼리 임베딩 단위 테스트."""
from __future__ import annotations
import pytest
from services.search import retrieval_service as rs
def test_resolve_qwen_backends():
for slug in ("cand_qwen06", "cand_qwen4", "cand_qwen4m"):
cfg = rs._resolve_backend(slug)
assert cfg["docs_table"].startswith("documents_cand_qwen")
assert cfg["chunks_table"].startswith("document_chunks_cand_qwen")
assert cfg["embed_kind"] == "ollama"
# 테이블명이 2단계 SQL allowlist 도 통과해야 함 (R2-B1)
assert rs._VALID_DOCS_TABLE.match(cfg["docs_table"])
assert rs._VALID_CHUNKS_TABLE.match(cfg["chunks_table"])
assert rs._resolve_backend("baseline") is None
with pytest.raises(ValueError):
rs._resolve_backend("cand_unknown")
def test_qwen4m_has_mrl_dimensions():
assert rs._resolve_backend("cand_qwen4m")["embed_dimensions"] == 1024
assert "embed_dimensions" not in rs._resolve_backend("cand_qwen4")
class _FakeResp:
def __init__(self, embs):
self._embs = embs
def raise_for_status(self):
return None
def json(self):
return {"embeddings": self._embs}
class _FakeClient:
"""httpx.AsyncClient 대역 — post body 캡처."""
captured: dict = {}
def __init__(self, *a, **k):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *a):
return False
async def post(self, url, json=None):
_FakeClient.captured = {"url": url, "json": json}
dim = (json or {}).get("dimensions") or 1024
return _FakeResp([[0.1] * dim])
@pytest.mark.asyncio
async def test_ollama_query_embed_applies_instruct_prefix(monkeypatch):
import httpx
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
cfg = rs._resolve_backend("cand_qwen06")
out = await rs._embed_query_via_ollama(cfg, "압력용기 수압시험")
assert out is not None and len(out) == 1024
body = _FakeClient.captured["json"]
assert body["model"] == "qwen3-embedding:0.6b"
assert body["input"][0].startswith(rs.QWEN3_QUERY_INSTRUCT)
assert body["input"][0].endswith("압력용기 수압시험")
assert "dimensions" not in body
@pytest.mark.asyncio
async def test_ollama_query_embed_mrl_dimensions(monkeypatch):
import httpx
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
cfg = rs._resolve_backend("cand_qwen4m")
out = await rs._embed_query_via_ollama(cfg, "q")
assert _FakeClient.captured["json"]["dimensions"] == 1024
assert len(out) == 1024
@pytest.mark.asyncio
async def test_ollama_query_embed_failure_returns_none(monkeypatch):
import httpx
class _Boom(_FakeClient):
async def post(self, url, json=None):
raise httpx.ConnectError("down")
monkeypatch.setattr(httpx, "AsyncClient", _Boom)
cfg = rs._resolve_backend("cand_qwen06")
assert await rs._embed_query_via_ollama(cfg, "q") is None
+202
View File
@@ -0,0 +1,202 @@
"""생성 LLM 홀드 (pipeline.held_stages) — 컨슈머/워커 게이트 동작 테스트.
홀드 시멘틱: held 스테이지는 claim 자체를 하지 않는다 (attempts 미소모, DB 무접촉).
-held 스테이지는 기존과 동일하게 처리된다.
"""
import pytest
from core.config import Settings, settings
from workers import digest_worker, queue_consumer
def _fake_consumer_env(monkeypatch, held):
processed = []
async def fake_process(stage, worker):
processed.append(stage)
async def fake_reset(stages, threshold):
return None
monkeypatch.setattr(queue_consumer, "_process_stage", fake_process)
monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset)
monkeypatch.setattr(
queue_consumer, "_load_workers",
lambda: {
s: object()
for s in (queue_consumer.MAIN_QUEUE_STAGES
+ queue_consumer.FAST_QUEUE_STAGES + ["markdown"])
},
)
monkeypatch.setattr(queue_consumer, "_hold_logged", False)
monkeypatch.setattr(settings, "pipeline_held_stages", held)
return processed
def test_settings_default_empty():
"""미설정 시 빈 리스트 = 무동작 (기존 동작 무회귀)."""
assert Settings().pipeline_held_stages == []
@pytest.mark.asyncio
async def test_consume_queue_skips_held_stages(monkeypatch):
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary"]
)
await queue_consumer.consume_queue()
assert "classify" not in processed
assert "summarize" not in processed
assert "deep_summary" not in processed
# 특화 스테이지는 계속 처리 (embed/chunk 는 2026-06-12 fast 컨슈머로 분리)
for stage in ("extract", "stt", "fulltext"):
assert stage in processed
@pytest.mark.asyncio
async def test_consume_queue_empty_hold_processes_all(monkeypatch):
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_queue()
assert processed == list(queue_consumer.MAIN_QUEUE_STAGES)
@pytest.mark.asyncio
async def test_fast_consumer_processes_embed_chunk_only(monkeypatch):
"""fast 컨슈머(2026-06-12 분리) = embed/chunk 전용, LLM 사이클과 디커플."""
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_fast_queue()
assert processed == ["embed", "chunk"]
@pytest.mark.asyncio
async def test_fast_consumer_respects_hold(monkeypatch):
processed = _fake_consumer_env(monkeypatch, ["embed"])
await queue_consumer.consume_fast_queue()
assert processed == ["chunk"]
def test_fast_split_invariants():
"""세 컨슈머 stage 집합 disjoint + embed/chunk 배치 상향 회귀 가드."""
main = set(queue_consumer.MAIN_QUEUE_STAGES)
fast = set(queue_consumer.FAST_QUEUE_STAGES)
md = set(queue_consumer.MARKDOWN_QUEUE_STAGES)
assert not (main & fast) and not (main & md) and not (fast & md)
assert fast == {"embed", "chunk"}
assert queue_consumer.BATCH_SIZE["embed"] >= 10
assert queue_consumer.BATCH_SIZE["chunk"] >= 10
@pytest.mark.asyncio
async def test_markdown_consumer_not_held(monkeypatch):
"""markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary", "digest"]
)
await queue_consumer.consume_markdown_queue()
assert processed == ["markdown"]
@pytest.mark.asyncio
async def test_digest_worker_held_returns_before_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["digest"])
await digest_worker.run()
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_digest_worker_unheld_runs_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {"clusters": 0}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", [])
await digest_worker.run()
assert called["pipeline"] is True
@pytest.mark.asyncio
async def test_briefing_worker_held_returns_before_pipeline(monkeypatch):
from workers import briefing_worker
called = {"pipeline": False}
async def fake_pipeline(target_date):
called["pipeline"] = True
return {}
monkeypatch.setattr(briefing_worker, "run_briefing_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["briefing"])
assert await briefing_worker.run() is None
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_study_explanation_consumer_held(monkeypatch):
from workers import study_queue_consumer
touched = []
async def fake_reset():
touched.append("reset")
monkeypatch.setattr(study_queue_consumer, "reset_stale_study_jobs", fake_reset)
monkeypatch.setattr(settings, "pipeline_held_stages", ["study_explanation"])
await study_queue_consumer.consume_study_queue()
assert touched == []
@pytest.mark.asyncio
async def test_study_consumers_held_no_db_touch(monkeypatch):
"""held 시 stale reset 포함 DB 접근 0 — claim 미발생 실증."""
from workers import study_memo_card_jobs_consumer, study_session_queue_consumer
touched = []
async def fake_reset_session():
touched.append("session_reset")
async def fake_reset_card():
touched.append("card_reset")
monkeypatch.setattr(
study_session_queue_consumer, "reset_stale_session_jobs", fake_reset_session
)
monkeypatch.setattr(
study_memo_card_jobs_consumer, "reset_stale_card_jobs", fake_reset_card
)
monkeypatch.setattr(
settings, "pipeline_held_stages",
["study_session_analysis", "study_memo_card"],
)
await study_session_queue_consumer.consume_study_session_queue()
await study_memo_card_jobs_consumer.consume_study_memo_card_queue()
assert touched == []
+8 -2
View File
@@ -20,8 +20,14 @@ from services.search.llm_gate import (
@pytest.fixture(autouse=True)
def _reset_gate():
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다)."""
def _reset_gate(monkeypatch):
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).
2026-06-12 capacity 일반화 이후 파일의 직렬화 가정 보존을 위해
concurrency=1 고정 (capacity>1 동작은 test_fair_share.py 커버).
"""
from core.config import settings
monkeypatch.setattr(settings, "mlx_gate_concurrency", 1)
_reset_for_test()
yield
_reset_for_test()
+11 -2
View File
@@ -379,5 +379,14 @@ def test_build_stages_order_fields_and_age():
assert by["extract"]["failed"] == 2
assert by["extract"]["oldest_pending_age_sec"] is None
# 전 stage 행 존재 (빈 단계 숨김은 FE 몫)
assert {"stage", "pending", "processing", "failed", "done_today",
"oldest_pending_age_sec"} == set(rows[0].keys())
assert {"stage", "pending", "processing", "failed", "done_1h", "created_1h",
"done_today", "oldest_pending_age_sec"} == set(rows[0].keys())
def test_build_stages_exposes_rates():
"""ds-board-engines-1: done_1h/created_1h 노출 — 흐름 노드 처리율·ETA·유입 우세 재료."""
from services.queue_overview import build_stages
stats = {"embed": _stage(pending=4, done_1h=600, created_1h=120, done_today=900)}
rows = build_stages(stats)
embed = next(r for r in rows if r["stage"] == "embed")
assert (embed["done_1h"], embed["created_1h"], embed["done_today"]) == (600, 120, 900)