Compare commits

...

23 Commits

Author SHA1 Message Date
hyungi 5581d3f1ce feat(board): 처리 보드 v2 — 파이프라인 흐름 뷰·엔진 구분·실패 재시도/건너뛰기 (ds-board-engines-1)
- 흐름 뷰 메인: 좌→우 노드(머신·엔진 태그, 유입 우세 amber, 실패 뱃지) + 머신 스트립(모델 표기) + trend_24h 스파크라인 첫 렌더
- 노드 클릭 상세 패널: KV 4칸 + 다중 stage 행 + 지금 처리 중
- 실패 처리 드로어: 에러 패턴 그룹 + 재시도/건너뛰기 (영구 실패의 첫 사용자 조치 경로)
- API: stages[].done_1h/created_1h 노출 + GET /api/queue/failed + POST /api/queue/retry|/skip (uq_queue_active 충돌 skip, 건너뛰기는 enqueue_next_stage 미호출)
- 엔진/모델 표기 = queueDisplay.ts 정적 맵 단일 지점 (모델 교체 시 1곳)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 01:05:04 +00:00
hyungi 8ac1dbf4a8 test(eval): Phase 2A E-4 비교기 — per-query win/loss/tie(ε)·부트스트랩 CI·카테고리 분해
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 08:34:18 +09:00
hyungi c3d237766d feat(search): Phase 2A E-1 — Qwen 후보 3종 백필 CLI + eval 디스패처 확장 (마이그 328~333)
- 후보 섀도 테이블 6종(전부 vector 타입 — eval=exact scan 이라 인덱스 불요, halfvec 은 C-1 소관)
- workers/phase2a_cand_backfill: resumable(NOT EXISTS)·배치 커밋·동결셋 한정(--doc/chunk-id-max),
  문서/청크 입력 = production 경로 동일 구성 + plain
- CANDIDATE_BACKEND_MAP += cand_qwen06/qwen4/qwen4m (embed_kind=ollama, 쿼리측 instruct prefix
  G-1 핀 문자열, qwen4m = dimensions 1024 MRL)
- qwen4m 적재는 qwen4 에서 SQL 파생(subvector+l2_normalize) — 본 CLI 비대상

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 08:29:53 +09:00
hyungi 5bc68c95f6 test(eval): Phase 2A G-1 — Qwen3-Embedding 서빙 fixture 박제 (Ollama 0.20.0, /api/embed)
0.6b=1024d/4b=2560d 정규화 출력, MRL dimensions 옵션 지원(재정규화 포함),
비대칭 instruct prefix 효과 실측(+0.016), instruct 문자열 핀.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 08:14:24 +09:00
hyungi 5dca5b5d28 ops(pipeline): embed/chunk 고속 컨슈머 분리 + 배치 1→10 — LLM 사이클 인질 해소
진단(2026-06-12 용량 평가): 단일 루프에서 classify(~190s×3)가 사이클을 점유,
건당 <1s 인 embed/chunk 가 사이클당 1건 캡 → 실효 ~580/일 vs 수요 최대 2,700/일,
적체 3,570 + 신규 문서 벡터 미적재(RAG 검색 누락). 4070 가동률 0% = 순수 구조 캡.
수리 = markdown 분리(05-01) 선례: consume_fast_queue 1분 잡 + 배치 10(GPU 공유 보수값,
캡 ~14,400/일). 세 컨슈머 stage 집합 disjoint(stale reset 이중 복구 방지). retrieval
로직·임베딩 모델 무접촉.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 07:50:07 +09:00
hyungi 9c9ff6eeba test(drain): classify 합류 반영 — 거부 케이스를 extract 로 교체
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 07:22:47 +09:00
hyungi d667545185 fix(classify): 적대 리뷰 반영 — use_deep 스레딩(B1)·StageDeferred 전파(B2)·legacy 호출 deep 경유(M3)
- _run_tier_triage(use_deep) 스레딩 — 미배선 NameError(전 classify 파괴) fix
- process 의 triage try 에 except StageDeferred: raise 선행 (drain 보류 시멘틱 복구)
- legacy classify()/summarize() 에 cfg 파라미터 — use_deep 시 deep 슬롯 경유 +
  is_deferrable_error → StageDeferred 변환(첫 호출 = 최저비용 지점에서 보류, doc 쓰기 0)
- ai_model_version = 실제 처리 경로 모델 (drain=qwen-macbook 귀속)
- analyze_event model_name 스레딩 + deep triage cfg 에 top_p 동승

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 07:12:40 +09:00
hyungi 235bbf9881 ops(pipeline): fair-share 번들 — drain classify 합류 + deep 맥미니 폴백 + mlx 게이트 동시 2
사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버:
- queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling,
  완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지)
- deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 —
  강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지
- llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) —
  'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제
- analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 06:56:02 +09:00
hyungi 30200a4e49 ops(ai): deep 슬롯 재도입 — 맥북 야간 night-drain 레버 (Qwen3.6-27B-6bit)
사용자 지시: 자기 전 night-drain 한 번 실행 → 맥북이 밤새 summarize/deep_summary 분담.
보류 시멘틱(StageDeferred)·drain CLI·라우터 wake preflight = 기존 검증 자산 재사용.
맥북 측 = RunAtLoad=false 수동 기동 + 서버 수명 한정 caffeinate + idle-watch 자동 종료.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 21:49:12 +09:00
hyungi eff2c3b7d3 ops(search): Qwen 27B 속도 반영 — synthesis 30s→120s, classifier 슬롯 모델 동승 교체
- config classifier 모델 gemma 잔존 = mlx 서버 Gemma 재로드(이중 적재) 위험 → Qwen 6bit 로 동승 교체
- synthesis 는 timeout 시 graceful skip 이 없는 답변 본체라 단독 상향 (classifier/query_analyzer/
  rewriter 의 30s/15s 캡은 초과 시 skip·원쿼리 폴백으로 degrade — 관찰 후 별도 튜닝)
- ask.backend.timeout_read_s 30→120 align

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 17:31:26 +09:00
hyungi 3d79002dfa ops(ai): Qwen 27B 프리필 실측(~112 tok/s) 반영 timeout 상향 — triage 480 / primary 900
장문(context_char_limit 상한급) 프리필이 수 분 걸려 기존 120/300s 로는 timeout 실패 churn.
단일 코루틴 컨슈머라 장문 1건이 사이클을 수 분 점유하는 것은 수용(관찰 후 배칭/컨텍스트 튜닝 PR).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 17:29:45 +09:00
hyungi 3d60008965 ops(ai)!: 맥미니 생성 모델 Qwen3.6-27B-6bit 전환 + 생성 LLM 홀드 해제
B안(사용자 2026-06-11): Gemma 26B-A4B → Qwen3.6-27B-6bit 풀교체.
- config.yaml triage/primary model 교체 + dense 감속 반영 timeout 상향(30→120/180→300)
- held_stages [] (홀드 해제 — 적체 자연 드레인, deep_summary 는 primary 복귀)
- eid deep 모드 = mac-mini-default 재지정(맥북 백지화). llm_gate '예외 없이 gate' invariant 에
  따라 deep 도 alias 조건으로 자동 게이트 (구 무게이트 = 맥북 별 endpoint 예외였음)
- deep probe 실패 reason = router_unreachable 로 정정 + 테스트 동기화
잔여(별 PR): ask 표면 qwen-macbook 옵션/백엔드 클래스/처리보드 맥북 카드 정리

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 17:19:35 +09:00
hyungi cd0040925a ops(pipeline): 생성 LLM 홀드 게이트 held_stages — 맥미니 모델 확정까지 보류
맥북 LLM 백지화 + 맥미니 모델 재결정에 따라 DS 의 생성 LLM 소비를 일괄 보류.
held = classify/summarize/deep_summary(큐, claim 미발생·attempts 미소모) +
digest(04:00)/briefing(05:10) cron + study explanation/session_analysis/memo_card 컨슈머.
GPU 특화 스테이지·수집기·인터랙티브(ask/eid chat)는 무영향. 기본값 [] = 무동작.
/api/digest/regenerate 는 홀드 중 409 명시. 해제 = config held_stages 비우고 fastapi 재기동.
exec plan: ~/.claude/plans/ds-llm-hold-exec-20260611.md

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 16:52:46 +09:00
hyungi fdac449a48 Merge pull request 'Feat/eid chat' (#35) from feat/eid-chat into main
Reviewed-on: #35
2026-06-11 15:14:43 +09:00
hyungi 40f5b5fe9e Merge pull request 'Feat/ds processing board' (#33) from feat/ds-processing-board into main
Reviewed-on: #33
2026-06-11 15:14:24 +09:00
hyungi a410f5b65c fix(ui): 머신 state 우선순위 — 가동 > 보류 (일하는 중엔 백오프 잔여여도 가동)
실측: 맥북이 드레인 처리 중인데도 백오프 잔여 때문에 카드 전체가 '보류'로 표시.
보류 칩은 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속) 한정으로 강등,
보류 건수 자체는 카드의 deferred_pending 라인이 계속 표시.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:36:10 +09:00
hyungi 7031439364 feat(ui): 단계별 현황 재설계 — 완료 가시화 + 빈 단계 숨김 (사용자 피드백)
'대기만 보이고 성공은 안 보인다' 피드백 반영:
- overview 에 stages[] 노출 (stage 별 done_today + oldest_pending_age, SQL 1필드 추가)
- 게이지 의미 전환: 단계 간 대기량 비교(amber) → 단계 내 오늘 진척(완료=green 비율,
  가득 찬 초록 = 다 끝남) + 처리 중 pulse dot
- 움직임 없는 단계는 행 제거, 하단 '비어 있음: ...' 한 줄로
- 라벨 누수 fix: details 가 구 STAGE_LABEL 을 쓰던 것 → queueStageLabel 통일
  (deep_summary/markdown/summarize/chunk/fulltext 한글화)
- 헤더: 오늘 N 완료(성공 가시화) · 실패(error) · 대기. 데이터 소스 = overview 단일화

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:26:27 +09:00
hyungi 468804494d feat(ui): 처리 머신 보드 — 누가 일하나 (안2) + ETA·전 페이지 스트립/드로어 (안5/6 라이트)
plan ds-processing-ui-6an (시안 choice 채택: 안2 1차 + 안5/6 지원):
- GET /api/queue/overview — 머신(GPU/맥미니/맥북) 귀속 라이브 집계 5쿼리, 마이그레이션 0.
  summarize 풀 완료 실적은 documents.ai_model_version 조인으로 맥북/맥미니 분리,
  보류(deferred_until)=맥북 카드 귀속, state=active/deferred/idle. raw 모델명 비노출
- 홈: 처리 머신 보드(3열 카드 + 지금 처리 중 제목) + ETA 라인(유입 우세 시 null 명시),
  기존 stage 테이블은 details 접힘으로 강등 (구조 개편)
- 전 페이지: 상태 스트립(처리중·대기·실패·맥북 칩) + 우측 드로어(QueueDrawer,
  dialog a11y) — 공유 60s 폴링 store, 경량 fetch(401 강제 logout 부수효과 회피)
- tests: 판정부 30건 (귀속/풀 분리/state 9케이스/ETA 경계/trend 버킷/계약 shape)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:13:35 +09:00
hyungi 01db4816fd feat(workers): drain 연속보류 내성 — 네트워크 플랩 흡수 (--defer-retries/--defer-wait)
실측 origin: Tailscale direct 경로 ~10분 플랩(13:25~13:34)으로 300건 run 이 32건에서
조기 종료. 보류 시멘틱 자체는 정상(무손상) — run 지속성만 보강. 연속 보류 5회까지
120s 간격 재시도, 한도 도달 = sleep 판정 종료. 성공 시 카운터 리셋.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 13:42:10 +09:00
hyungi e7c7a2091f fix(workers): 보류 분류에 라우터 502/504 추가 — upstream 절단이 라우터 경유에선 502 로 표면화
llm_router.py 실측: upstream 연결 실패/생성 중 절단 = HTTPException 502 (4곳).
맥북 sleep 절단의 실제 표면이라 503 단독 분류는 보류 누락 → 502/503/504 로 확장.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 13:00:55 +09:00
hyungi 88e5893041 feat(workers): 맥북 M5 Max 분담 배선 — deep 슬롯 + 보류 시멘틱 + queue_drain CLI
plan ds-macbook-offload-1 P2 (Soft Lock 예외 박제 ds-macbook-offload-exec-20260611.md):
- config ai.models.deep optional 슬롯 (라우터 :8890 경유 qwen-macbook, 부재 시 기존 경로)
- AIClient.call_deep + is_deferrable_error + call_deep_or_defer (자동 cloud/맥미니 폴백 0)
- deep_summary_worker: deep 슬롯 시 맥북 경유 (맥미니 mlx gate 미점유) + 실모델 기록
- StageDeferred 보류 시멘틱: 503/connect/read-timeout(sleep 절단) = attempts 미소모 +
  payload.deferred_until 30분 백오프, doc 쓰기는 완주+파싱 후 단일 커밋 (부분 쓰기 0)
- queue_consumer: claim 에 deferred 필터 + StageDeferred 분기
- workers.queue_drain: 수동 burst-drain CLI (summarize/deep_summary, SKIP LOCKED 단건
  claim, per-item 커밋, 보류 시 run 종료, deep 슬롯 필수 가드)
- tests 20건 + 라우터 경유 Qwen 실응답 fixture 박제 (13.2s 라이브)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 12:55:16 +09:00
hyungi 5e8b998a11 feat(documents): hier analyze 서브커맨드 — 재분해와 독립한 절분석 self-heal (g3-t3 갭)
re-decompose 의 char_start 완료마커는 'jump-target char_start 보유'라 컨테이너 recreate/deadline 으로
analyze 가 잘린 doc(char_start 있으나 일부 leaf 미분석)을 재선별 못 함 → rail summary 영구 미수렴 갭.
`analyze` 가 LEAF_SQL(미분석 leaf 보유) 기준 독립 선별로 수렴(멱등, --doc 제한 가능, jump 무관).
sweep 로그도 `analyze` 커맨드 안내로 갱신. (2026-06-10 백필서 recreate 로 잘린 5 doc·53 leaf 수동 처리한 케이스 항구화.)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-10 06:32:10 +09:00
hyungi 53999b2825 fix(documents): g-measure junk 검출 all-caps 과탐 제거 + verdict=coarse 스크린 명시
전부-대문자 휴리스틱이 기술문서 정상 heading(GENERAL REQUIREMENTS/WELDING) 130건 과탐 →
windowed/clean doc 거짓 A_better 강등. 회사-접미사(INC./LLC…)만, cover 영역(앞 4노드)+미stored 게이트.
verdict 는 coarse 스크린(감사용)이고 실집행 결정 = 결정적 partition + 적대 워크플로임을 docstring 박제.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 12:58:36 +09:00
50 changed files with 3737 additions and 126 deletions
+63 -5
View File
@@ -134,6 +134,49 @@ def _fix_json_string_escapes(s: str) -> str:
i += 1
return "".join(out)
def is_deferrable_error(exc: Exception) -> bool:
"""deep(맥북 M5 Max) 호출 실패가 '보류(StageDeferred)' 대상인지 분류 (ds-macbook-offload-1).
보류 = 맥북 일시 불가 신호:
- HTTP 503 (라우터 upstream_cold / editor_busy / warming — no-silent-fallback 계약)
- HTTP 502/504 (라우터가 upstream 연결 실패·생성 도중 절단을 502 로 변환 —
llm_router.py 실측 4곳. 맥북 sleep 절단이 라우터 경유 토폴로지에선 이걸로 표면화)
- httpx.TransportError 전계열 (ConnectError·ReadError·RemoteProtocolError +
ConnectTimeout·ReadTimeout 등) — 라우터 자체 불가 / DS↔라우터 구간 절단.
그 외(400/500, 파싱/검증 오류 등)는 보류가 아니라 호출자의 기존 실패 경로.
"""
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code in (502, 503, 504)
return isinstance(exc, httpx.TransportError)
async def call_deep_or_defer(
client: "AIClient",
prompt: str,
system: str | None = None,
cfg: "AIModelConfig | None" = None,
) -> str:
"""call_deep + 보류 변환 — 맥북 불가(503/연결/절단)는 StageDeferred 로 raise.
deep_summary_worker / summarize_worker(drain) / classify_worker(drain) 가 공유.
StageDeferred 는 queue_consumer/queue_drain 이 attempts 미소모 + deferred_until
백오프로 처리한다 (sleep-안전 불변식).
cfg: 지정 시 deep 슬롯 대신 이 config 로 호출 (classify drain — deep 슬롯의
endpoint 는 쓰되 triage 의 temperature/max_tokens 를 적용한 변형).
"""
from models.queue import StageDeferred
try:
if cfg is not None:
return await client._request(cfg, prompt, system=system)
return await client.call_deep(prompt, system=system)
except Exception as exc:
if is_deferrable_error(exc):
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
# 프롬프트 로딩
PROMPTS_DIR = Path(__file__).parent.parent / "prompts"
@@ -185,22 +228,37 @@ class AIClient:
"""triage/primary 실패 시 최후 방어선. Claude Sonnet 4 API (config.yaml ai.models.fallback) — PR #20 이후 swap 완료."""
return await self._request(self.ai.fallback, prompt)
async def call_deep(self, prompt: str, system: str | None = None) -> str:
"""심층 전용 — 맥북 M5 Max Qwen3.6-27B (config.yaml ai.models.deep, ds-macbook-offload-1).
llm-router :8890 경유(model=qwen-macbook alias) — 라우터의 wake preflight(~24s)·
editor_busy 가드를 재사용한다. 맥미니 mlx gate 와 무관(게이트는 맥미니 보호 목적)이라
gate 없이 호출. 자동 cloud/맥미니 폴백 없음 — 실패는 그대로 전파하고 보류 판단은
호출자가 is_deferrable_error() 로 한다. 슬롯 부재 시 primary 로 처리(방어적 —
호출자가 보통 슬롯 유무를 먼저 분기).
"""
cfg = self.ai.deep or self.ai.primary
return await self._request(cfg, prompt, system=system)
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str) -> dict:
async def classify(self, text: str, cfg=None) -> dict:
"""[DEPRECATED] 기존 classify_worker 전용. B-1 에서 summary_triage 로 대체.
호출부 정리 전 존속. 신규 코드는 call_triage + prompt_render 를 쓸 것.
cfg (2026-06-12 fair-share): 지정 시 primary 대신 해당 config 로 호출 —
drain classify 가 deep 슬롯(맥북) 경유에 사용. cfg != ai.primary 라
_call_chat 의 primary→fallback 자동 전환은 발동하지 않는다 (에러 raw 전파).
"""
prompt = CLASSIFY_PROMPT.replace("{document_text}", text)
response = await self._call_chat(self.ai.primary, prompt)
response = await self._call_chat(cfg or self.ai.primary, prompt)
return response
async def summarize(self, text: str, force_premium: bool = False) -> str:
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체."""
async def summarize(self, text: str, force_premium: bool = False, cfg=None) -> str:
"""[DEPRECATED] 기존 호출부용. B-1 에서 summary_triage 가 tldr 대체. cfg = classify() 와 동일."""
if force_premium:
return await self._call_chat(self.ai.premium, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
return await self._call_chat(self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
return await self._call_chat(cfg or self.ai.primary, f"다음 문서를 500자 이내로 요약해주세요:\n\n{text}")
async def embed(self, text: str) -> list[float]:
"""벡터 임베딩 — GPU 서버 전용"""
+8
View File
@@ -244,7 +244,15 @@ async def regenerate(
user: Annotated[User, Depends(require_admin)],
):
"""수동 트리거 — 백그라운드 태스크로 워커 실행 (admin 필요)."""
from core.config import settings
from workers.digest_worker import run
# 홀드 중 silent no-op 방지 — 워커 게이트와 동일 조건을 표면에서 명시.
if "digest" in settings.pipeline_held_stages:
raise HTTPException(
status_code=409,
detail="global_digest 보류 중 (config.yaml pipeline.held_stages) — 해제 후 재시도",
)
asyncio.create_task(run())
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}
+6 -5
View File
@@ -2,8 +2,9 @@
확정 결정:
- D-1 경로 = /api/eid/chat (main.py prefix=/api/eid + 본 라우터 POST /chat)
- D-2 mode 닫힌 어휘: daily(mac-mini-default) / deep(qwen-macbook). 클라는 mode 만 보냄 —
claude-cloud / auto 금지 (Literal 로 422 차단). 심층(deep) 모드 무게이트.
- D-2 mode 닫힌 어휘: daily / deep — 둘 다 mac-mini-default (맥북 백지화 2026-06-11,
맥미니 Qwen 27B 단일 호스트. deep = ReAct 자동검색 모드 구분). 클라는 mode 만 보냄 —
claude-cloud / auto 금지 (Literal 로 422 차단). 게이트 = alias 기준 자동 적용(무게이트 폐지).
- D-3 독립 /chat 라우트 (frontend) — 본 모듈은 백엔드 API 만.
- D-5 LLM 호출 = EidAIClient.call_stream 한 곳 (이드 egress 봉쇄 불변식 #5,
RouterBackend 직접 호출 금지).
@@ -43,7 +44,7 @@ logger = setup_logger("eid_chat")
router = APIRouter()
# ── ds-eid-ask-absorb P1: deep 모드 = ReAct 자동검색 (qwen-macbook 27B) ──
# ── ds-eid-ask-absorb P1: deep 모드 = ReAct 자동검색 (맥미니 Qwen 27B, 2026-06-11~) ──
# 비생성 reachability probe — router 도달만 확인(coarse). 27B(맥북) 자체 미가용은
# 첫 generate_with_tools 호출의 BackendUnavailable → mid-stream error envelope 로 커버
# (plan: probe 정밀도 불필요, TOCTOU 는 in-stream error 가 처리). ~2s 타임아웃·생성 슬롯 비점유.
@@ -160,10 +161,10 @@ async def _eid_chat_deep(body: ChatRequest, session: AsyncSession) -> StreamingR
"""
# ① 첫 SSE 바이트(=HTTP 200 확정) 전 비생성 probe — router 도달 실패 시 503 (재매핑 가능 구간)
if not await _probe_router_reachable():
return _backend_unavailable_response(body, "macbook_unavailable", "qwen-macbook")
return _backend_unavailable_response(body, "router_unreachable", "mac-mini-default")
query = body.messages[-1].content # 메시지 단독 처리 (마지막 user 턴)
backend = get_backend("qwen-macbook")
backend = get_backend("mac-mini-default")
async def _stream() -> AsyncIterator[bytes]:
# ② phase:searching 방출 = HTTP 200 확정. 이후 미가용은 503 불가 → in-stream error.
+177
View File
@@ -0,0 +1,177 @@
"""처리 머신 보드 API — /api/queue/* (plan ds-processing-ui-6an → ds-board-engines-1).
- GET /overview: 홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계
로직은 services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와
계약 고정. 응답에 raw 모델명 노출 금지 — 머신 label 만 (엔진/모델 표기는
FE 정적 맵 책임).
- GET /failed + POST /retry|/skip: 실패 처리 (ds-board-engines-1) — 영구 실패
(자동 재시도 3회 소진)의 유일한 사용자 조치 경로. 일괄 조치는 FE 가 그룹의
id 목록을 모아 보낸다 (서버측 패턴 매칭 없음 — raw 식별자/패턴 미수신).
"""
from datetime import datetime
from typing import Annotated, Literal
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from models.user import User
from services.queue_overview import (
build_overview,
fetch_failed_items,
retry_failed,
skip_failed,
)
router = APIRouter()
class CurrentItem(BaseModel):
"""머신이 지금 처리 중인 문서 (최대 2건)."""
document_id: int
title: str
stage: str
class MachineCard(BaseModel):
"""머신 카드 — stage 귀속 합산 + 완료 실적(summarize 는 풀 분리) + state."""
key: Literal["gpu", "macmini", "macbook"]
label: str
state: Literal["active", "deferred", "idle"]
stages: list[str]
pending: int
processing: int
failed: int
done_1h: int
done_today: int
deferred_pending: int
current: list[CurrentItem]
class SummarizeEta(BaseModel):
"""summarize 풀 ETA — done > inflow 일 때만 eta_minutes 산출."""
pending: int
done_rate_1h: int
inflow_rate_1h: int
eta_minutes: int | None
class TrendBucket(BaseModel):
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
hour: str
inflow: int
done: int
class Totals(BaseModel):
"""전 stage 합계."""
pending: int
processing: int
failed: int
class StageRow(BaseModel):
"""단계별 현황 행 — 흐름 노드/상세 패널용.
done_1h/created_1h = 처리율·유입률 (유입 우세 판정 + ETA 의 FE 재료,
ds-board-engines-1 추가 — 수집 SQL 에 이미 있던 값의 노출).
"""
stage: str
pending: int
processing: int
failed: int
done_1h: int
created_1h: int
done_today: int
oldest_pending_age_sec: int | None
class QueueOverviewResponse(BaseModel):
machines: list[MachineCard]
stages: list[StageRow]
summarize_eta: SummarizeEta
trend_24h: list[TrendBucket]
totals: Totals
class FailedItem(BaseModel):
"""영구 실패 행 — 실패 드로어 표시 단위."""
id: int
stage: str
document_id: int
title: str
attempts: int
max_attempts: int
error_message: str | None
failed_at: datetime | None
class FailedListResponse(BaseModel):
items: list[FailedItem]
total: int
class QueueActionRequest(BaseModel):
"""재시도/건너뛰기 대상 — 실패 행 id 목록 (FE 가 그룹핑 후 전달)."""
ids: list[int] = Field(min_length=1, max_length=300)
class RetryResponse(BaseModel):
requested: int
retried: int
not_retried: int
class SkipResponse(BaseModel):
requested: int
skipped: int
not_skipped: int
@router.get("/overview", response_model=QueueOverviewResponse)
async def get_queue_overview(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""머신 관점 처리 보드 + summarize ETA 집계 (라이브 계산, 신규 테이블 0)"""
return QueueOverviewResponse.model_validate(await build_overview(session))
@router.get("/failed", response_model=FailedListResponse)
async def get_failed_items(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)"""
items = await fetch_failed_items(session)
return FailedListResponse(
items=[FailedItem.model_validate(i) for i in items],
total=len(items),
)
@router.post("/retry", response_model=RetryResponse)
async def retry_failed_items(
body: QueueActionRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""실패 행 재시도 — attempts 리셋 + pending 복귀.
not_retried = 같은 (문서, 단계) 의 active 행 충돌(uq_queue_active) 또는
이미 failed 가 아닌 행 (중복 클릭 등) — 건드리지 않고 건수만 보고.
"""
return RetryResponse.model_validate(await retry_failed(session, body.ids))
@router.post("/skip", response_model=SkipResponse)
async def skip_failed_items(
body: QueueActionRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""실패 행 건너뛰기 — completed 마킹(payload.skipped_by_user) + 연쇄 없음"""
return SkipResponse.model_validate(await skip_failed(session, body.ids))
+33
View File
@@ -98,6 +98,10 @@ class AIConfig(BaseModel):
classifier: AIModelConfig | None = None
# Phase 3.5b: semantic verifier (optional — 없으면 grounding-only). PR #20 이후 Mac mini 26B MLX endpoint (initial = exaone3.5).
verifier: AIModelConfig | None = None
# ds-macbook-offload-1: 심층 전용 슬롯 (optional). 맥북 M5 Max Qwen3.6-27B — llm-router :8890
# 경유(model=qwen-macbook alias, wake preflight 재사용). 부재 시 deep_summary 는 기존
# primary(맥미니 26B) 경로 그대로 = 기능 미활성. 명시 opt-in — silent fallback 없음.
deep: AIModelConfig | None = None
# Legacy: vision 슬롯 (현재 사용처 0 — Document Server 는 OCR/STT 별도 서비스).
# 제거 진행 중이므로 optional 로 관대한 로딩 유지.
vision: AIModelConfig | None = None
@@ -154,6 +158,17 @@ class Settings(BaseModel):
# 업로드 한도 (authoritative policy)
upload: UploadConfig = UploadConfig()
# 생성 LLM 홀드 (2026-06-11): config.yaml pipeline.held_stages 에 든 이름의
# 컨슈머/워커는 claim 자체를 하지 않는다 (attempts 미소모, pending 적체 = 의도).
# 유효 키 = 큐 stage 명(classify/summarize/deep_summary) + cron/컨슈머 키(digest,
# briefing, study_explanation, study_session_analysis, study_memo_card).
# 빈 리스트 = 무동작 (기존 동작 그대로).
pipeline_held_stages: list[str] = []
# mlx gate 동시 실행 상한 (2026-06-12, config.yaml pipeline.mlx_gate_concurrency).
# 1 = 구 single-inference 동작. 2 = continuous batching 활용 (llm_gate docstring 참조).
mlx_gate_concurrency: int = 1
# PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini
# GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거.
study_explanation_enabled: bool = True
@@ -218,6 +233,7 @@ def load_settings() -> Settings:
verifier=(
AIModelConfig(**models["verifier"]) if "verifier" in models else None
),
deep=(AIModelConfig(**models["deep"]) if "deep" in models else None),
deep_summary_backlog=DeepSummaryBacklogConfig(
**ai_raw.get("deep_summary_backlog", {})
),
@@ -239,6 +255,21 @@ def load_settings() -> Settings:
)
)
pipeline_held_stages: list[str] = []
mlx_gate_concurrency = 1
if config_path.exists() and raw and "pipeline" in raw:
held_raw = (raw.get("pipeline") or {}).get("held_stages") or []
# 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용.
if not isinstance(held_raw, (list, tuple)):
held_raw = [held_raw]
pipeline_held_stages = [str(s) for s in held_raw]
try:
mlx_gate_concurrency = max(
1, int((raw.get("pipeline") or {}).get("mlx_gate_concurrency", 1))
)
except (TypeError, ValueError):
mlx_gate_concurrency = 1
taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {}
document_types = raw.get("document_types", []) if config_path.exists() and raw else []
upload_cfg = (
@@ -267,6 +298,8 @@ def load_settings() -> Settings:
study_explanation_enabled=study_explanation_enabled,
study_card_extract_enabled=study_card_extract_enabled,
internal_worker_token=internal_worker_token,
pipeline_held_stages=pipeline_held_stages,
mlx_gate_concurrency=mlx_gate_concurrency,
)
+10 -7
View File
@@ -29,16 +29,19 @@ import httpx
from ai.client import AIClient
from services.llm.backends import (
MAC_MINI_DEFAULT,
QWEN_MACBOOK,
BackendUnavailable,
_router_url, # router URL 단일 출처 재사용 (settings → env LLM_ROUTER_URL → MVP default)
)
from services.search.llm_gate import Priority, acquire_mlx_gate
# 이드 채팅 mode → router alias 닫힌 매핑 (D-2). 클라는 mode 만 보냄 — claude-cloud/auto 금지.
# 2026-06-11 맥북 백지화: deep 도 mac-mini-default (맥미니 Qwen 27B 단일 호스트).
# mode 구분은 유지 — deep = ReAct 자동검색 경로(모델이 아니라 동작이 다름).
# 게이트는 alias==MAC_MINI_DEFAULT 조건이라 deep 도 자동으로 mlx gate 적용
# (llm_gate "예외 없이 gate 획득 필수" invariant 충족 — 구 무게이트는 맥북 예외였음).
_CHAT_ALIAS: dict[str, str] = {
"daily": MAC_MINI_DEFAULT, # router tier_b → Mac mini :8801 gemma-4-26b
"deep": QWEN_MACBOOK, # router named upstream → M5 Max Qwen3.6-27B (무게이트, D-2)
"daily": MAC_MINI_DEFAULT, # router tier_b → Mac mini :8801
"deep": MAC_MINI_DEFAULT, # 맥북 폐기로 동일 upstream — ReAct 검색 모드 구분만 유지
}
# read 는 per-chunk 적용이라 MacBook wake(24s)+토큰 생성 간격 커버. connect 는 내부 router 라 짧게.
@@ -161,10 +164,10 @@ class EidAIClient(AIClient):
_rewrite_sse_line 으로 model 치환(mode 어휘)·usage 제거만 하고 프레이밍은 보존.
취소/disconnect 시 AsyncExitStack 이 response·client 정리(upstream 닫힘 보장).
daily(mac-mini-default)는 Mac mini MLX 단일 inference 영구 룰(llm_gate docstring
"예외 없이 gate 획득 필수")에 따라 acquire_mlx_gate(FOREGROUND) 안에서 스트리밍 —
RouterBackend 의 requires_gate=True 와 동일한 client-side mutex 효과.
deep(qwen-macbook)은 별 endpoint 라 무게이트 (D-2, RouterBackend 동형).
daily/deep 모두 mac-mini-default(2026-06-11 맥북 백지화) → Mac mini MLX 단일
inference 영구 룰(llm_gate docstring "예외 없이 gate 획득 필수")에 따라
acquire_mlx_gate(FOREGROUND) 안에서 스트리밍 — 게이트 조건이 alias 기준이라
deep 도 자동 적용 (구 무게이트는 맥북 별 endpoint 시절 예외였음).
중계 전체(업스트림 진입~종료)는 asyncio.timeout(_STREAM_DEADLINE_S) wall-clock
deadline 안 — llm_gate 계약 "timeout 은 gate 안쪽" 준수(gate 대기엔 미적용).
+7 -1
View File
@@ -22,6 +22,7 @@ from api.events import router as events_router
from api.library import router as library_router
from api.memos import router as memos_router
from api.news import router as news_router
from api.queue_overview import router as queue_overview_router
from api.search import router as search_router
from api.setup import router as setup_router
from api.study_question_progress import router as study_question_progress_router
@@ -60,7 +61,7 @@ async def lifespan(app: FastAPI):
from workers.csb_collector import run as csb_collector_run
from workers.api_standards_collector import run as api_standards_run
from workers.ccps_collector import run as ccps_collector_run
from workers.queue_consumer import consume_queue, consume_markdown_queue
from workers.queue_consumer import consume_queue, consume_fast_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
@@ -94,6 +95,9 @@ async def lifespan(app: FastAPI):
# 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을
# stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지.
scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer")
# 2026-06-12 fast-consumer split: embed/chunk(건당 <1s)를 LLM 사이클에서 분리 —
# classify(~190s×3)가 사이클을 점유해 벡터 적재가 굶던 구조 캡 해소 (markdown 선례).
scheduler.add_job(consume_fast_queue, "interval", minutes=1, id="fast_queue_consumer")
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup")
# PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리).
@@ -183,6 +187,8 @@ app.include_router(events_router, prefix="/api/events", tags=["events"])
app.include_router(dashboard_router, prefix="/api/dashboard", tags=["dashboard"])
app.include_router(library_router, prefix="/api/library", tags=["library"])
app.include_router(news_router, prefix="/api/news", tags=["news"])
# 처리 머신 보드 (plan ds-processing-ui-6an) — GET /api/queue/overview
app.include_router(queue_overview_router, prefix="/api/queue", tags=["queue"])
app.include_router(digest_router, prefix="/api/digest", tags=["digest"])
app.include_router(briefing_router, prefix="/api/briefing", tags=["briefing"])
app.include_router(audio_router, prefix="/api/audio", tags=["audio"])
+5
View File
@@ -14,6 +14,11 @@ from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
# FK("users.id") 해석에 users 테이블 메타데이터 필요 — fastapi 앱은 어차피 전 모델을
# import 하지만, CLI 단독 실행(queue_drain 등)은 본 모듈만 끌어와 INSERT 시
# "could not find table 'users'" 로 실패했다 (2026-06-12 drain 로그 실측). 명시 import.
from models.user import User # noqa: F401
class AnalyzeEvent(Base):
__tablename__ = "analyze_events"
+28 -1
View File
@@ -2,14 +2,41 @@
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, func, or_, text
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.types import TIMESTAMP
from core.database import Base
class StageDeferred(Exception):
"""워커가 '지금은 처리 불가 — 자료 손상 없이 보류' 를 선언하는 신호 (ds-macbook-offload-1).
맥북(M5 Max) deep 슬롯 경로 전용: 503(upstream_cold/editor_busy/warming) · 연결 실패 ·
생성 중 절단(read-timeout, 맥북 sleep) 시 raise. queue_consumer/queue_drain 이 attempts 를
소모하지 않고 pending 복귀 + payload.deferred_until 백오프를 기록한다. 결과 쓰기는 호출
완주 + 파싱 성공 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0 (sleep-안전 불변식).
"""
def __init__(self, reason: str, retry_after_minutes: int = 30):
super().__init__(reason)
self.retry_after_minutes = retry_after_minutes
def not_deferred_condition():
"""보류 백오프(payload.deferred_until, ISO 문자열) 가 미래인 행을 claim 에서 제외.
payload 없음 / 키 없음 = 통과. queue_consumer 와 queue_drain 의 claim 이 공유한다.
"""
deferred = ProcessingQueue.payload["deferred_until"].astext
return or_(
deferred.is_(None),
deferred.cast(TIMESTAMP(timezone=True)) <= func.now(),
)
class ProcessingQueue(Base):
__tablename__ = "processing_queue"
+513
View File
@@ -0,0 +1,513 @@
"""처리 머신 보드 + ETA 집계 (plan ds-processing-ui-6an, 안2+안5/6).
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리.
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
귀속 규칙 (단일 진실):
- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/
fulltext/stt · macmini = classify/summarize · macbook = deep_summary
(단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속).
- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료
실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook'
이면 macbook 실적, 아니면 macmini 실적.
- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속
(보류 = 맥북 불가 신호).
"""
from datetime import datetime, timedelta
from posixpath import basename
from zoneinfo import ZoneInfo
from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
KST = ZoneInfo("Asia/Seoul")
# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출.
_MACBOOK_MODEL_ALIAS = "qwen-macbook"
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
_GPU_STAGES = (
"extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt",
)
_MACMINI_STAGES = ("classify", "summarize")
_MACBOOK_STAGES = ("deep_summary",)
_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES
_MACHINE_KEYS = ("gpu", "macmini", "macbook")
_MACHINE_LABELS = {
"gpu": "GPU 서버",
"macmini": "맥미니",
"macbook": "맥북 M5 Max",
}
# 머신 카드당 current 표시 상한
_CURRENT_LIMIT = 2
def stage_machine_map(deep_enabled: bool) -> dict[str, str]:
"""stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속."""
mapping: dict[str, str] = {}
for s in _GPU_STAGES:
mapping[s] = "gpu"
for s in _MACMINI_STAGES:
mapping[s] = "macmini"
for s in _MACBOOK_STAGES:
mapping[s] = "macbook" if deep_enabled else "macmini"
return mapping
def _zero_stage() -> dict:
return {
"pending": 0, "processing": 0, "failed": 0,
"done_1h": 0, "done_today": 0, "done_15m": 0,
"deferred_pending": 0, "created_1h": 0, "oldest_pending_at": None,
}
def rows_to_stage_stats(rows) -> dict[str, dict]:
"""stage×status 집계 쿼리 행 → {stage: {pending, ..., created_1h}} 변환."""
stats: dict[str, dict] = {}
for row in rows:
stats[row[0]] = {
"pending": int(row[1] or 0),
"processing": int(row[2] or 0),
"failed": int(row[3] or 0),
"done_1h": int(row[4] or 0),
"done_today": int(row[5] or 0),
"done_15m": int(row[6] or 0),
"deferred_pending": int(row[7] or 0),
"created_1h": int(row[8] or 0),
"oldest_pending_at": row[9] if len(row) > 9 else None,
}
return stats
def rows_to_summarize_split(rows) -> dict[str, dict]:
"""summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}.
is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용).
"""
split = {
"macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0},
"macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0},
}
for row in rows:
key = "macbook" if row[0] else "macmini"
split[key]["done_1h"] += int(row[1] or 0)
split[key]["done_today"] += int(row[2] or 0)
split[key]["done_15m"] += int(row[3] or 0)
return split
def display_title(row: dict) -> str:
"""표시용 제목 — title > original_filename > file_path basename > 문서 id."""
if row.get("title"):
return row["title"]
if row.get("original_filename"):
return row["original_filename"]
if row.get("file_path"):
return basename(row["file_path"].rstrip("/"))
return f"문서 #{row['document_id']}"
def build_machines(
stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
current_rows: list[dict],
*,
deep_enabled: bool,
) -> list[dict]:
"""머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부."""
smap = stage_machine_map(deep_enabled)
def g(stage: str, field: str) -> int:
return stage_stats.get(stage, {}).get(field, 0)
# current 귀속: processing 행을 머신별 최대 2건 (summarize processing → macmini)
current_by_machine: dict[str, list[dict]] = {k: [] for k in _MACHINE_KEYS}
for row in current_rows:
machine = smap.get(row["stage"])
if machine and len(current_by_machine[machine]) < _CURRENT_LIMIT:
current_by_machine[machine].append({
"document_id": row["document_id"],
"title": display_title(row),
"stage": row["stage"],
})
machines = []
for key in _MACHINE_KEYS:
stages = [s for s in _STAGE_ORDER if smap[s] == key]
pending = sum(g(s, "pending") for s in stages)
processing = sum(g(s, "processing") for s in stages)
failed = sum(g(s, "failed") for s in stages)
# 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속
done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize")
done_today = sum(g(s, "done_today") for s in stages if s != "summarize")
done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize")
if key in summarize_split:
done_1h += summarize_split[key]["done_1h"]
done_today += summarize_split[key]["done_today"]
done_15m += summarize_split[key]["done_15m"]
# 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관)
deferred_pending = (
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
if key == "macbook" else 0
)
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만.
if processing > 0 or done_15m > 0:
state = "active"
elif key == "macbook" and deferred_pending > 0:
state = "deferred"
else:
state = "idle"
machines.append({
"key": key,
"label": _MACHINE_LABELS[key],
"state": state,
"stages": stages,
"pending": pending,
"processing": processing,
"failed": failed,
"done_1h": done_1h,
"done_today": done_today,
"deferred_pending": deferred_pending,
"current": current_by_machine[key],
})
return machines
def compute_eta_minutes(pending: int, done_1h: int, inflow_1h: int) -> int | None:
"""ETA(분) = 순소화율 기반. done > inflow 일 때만 산출, 아니면 None (소화 불가)."""
if done_1h > inflow_1h:
return round(pending / (done_1h - inflow_1h) * 60)
return None
def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
"""summarize 풀 ETA — pending 은 보류(deferred) 포함 총수."""
s = stage_stats.get("summarize", _zero_stage())
pending = s["pending"]
done_rate = s["done_1h"]
inflow_rate = s["created_1h"]
return {
"pending": pending,
"done_rate_1h": done_rate,
"inflow_rate_1h": inflow_rate,
"eta_minutes": compute_eta_minutes(pending, done_rate, inflow_rate),
}
def build_trend(
inflow_buckets: dict[str, int],
done_buckets: dict[str, int],
now_kst: datetime,
) -> list[dict]:
"""summarize 24h 추이 — KST 시간 버킷 24개 (오래된 것부터, 빈 버킷 0).
버킷 key = "YYYY-MM-DD HH:00" (KST). SQL to_char 출력과 동일 포맷.
"""
base = now_kst.replace(minute=0, second=0, microsecond=0)
trend = []
for i in range(23, -1, -1):
bucket = base - timedelta(hours=i)
key = bucket.strftime("%Y-%m-%d %H:00")
trend.append({
"hour": bucket.strftime("%H:00"),
"inflow": inflow_buckets.get(key, 0),
"done": done_buckets.get(key, 0),
})
return trend
def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]:
"""단계별 현황 행 — '단계 상세' 패널용 (2026-06-11 사용자 피드백: 완료가 보여야 한다).
파이프라인 순서 유지, 미지 stage 는 뒤에. 숨김/강조 판단은 FE 몫 — 여기선 사실만.
oldest_pending_age_sec = 가장 오래된 pending 의 경과 초 (pending 없으면 None).
"""
from datetime import datetime, timezone
now = now or datetime.now(timezone.utc)
extra = [s for s in stage_stats if s not in _STAGE_ORDER]
rows = []
for stage in [*_STAGE_ORDER, *extra]:
st = stage_stats.get(stage) or _zero_stage()
oldest = st.get("oldest_pending_at")
age = None
if oldest is not None:
if oldest.tzinfo is None:
oldest = oldest.replace(tzinfo=timezone.utc)
age = max(0, int((now - oldest).total_seconds()))
rows.append({
"stage": stage,
"pending": st["pending"],
"processing": st["processing"],
"failed": st["failed"],
"done_1h": st["done_1h"],
"created_1h": st["created_1h"],
"done_today": st["done_today"],
"oldest_pending_age_sec": age,
})
return rows
def build_totals(stage_stats: dict[str, dict]) -> dict:
"""전 stage 합계."""
return {
"pending": sum(s["pending"] for s in stage_stats.values()),
"processing": sum(s["processing"] for s in stage_stats.values()),
"failed": sum(s["failed"] for s in stage_stats.values()),
}
def compose_overview(
stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
inflow_buckets: dict[str, int],
done_buckets: dict[str, int],
current_rows: list[dict],
*,
deep_enabled: bool,
now_kst: datetime,
) -> dict:
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
return {
"machines": build_machines(
stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled
),
"stages": build_stages(stage_stats),
"summarize_eta": build_summarize_eta(stage_stats),
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
"totals": build_totals(stage_stats),
}
# ─── SQL 수집부 (총 5쿼리) ────────────────────────────────────────────────────
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
_STAGE_STATS_SQL = """
SELECT
stage,
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
COUNT(*) FILTER (WHERE status = 'processing') AS processing,
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
COUNT(*) FILTER (WHERE status = 'completed'
AND completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
COUNT(*) FILTER (WHERE status = 'completed'
AND completed_at > :kst_midnight) AS done_today,
COUNT(*) FILTER (WHERE status = 'completed'
AND completed_at > NOW() - INTERVAL '15 minutes') AS done_15m,
COUNT(*) FILTER (WHERE status = 'pending'
AND payload ->> 'deferred_until' IS NOT NULL
AND (payload ->> 'deferred_until')::timestamptz > NOW())
AS deferred_pending,
COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '1 hour') AS created_1h,
MIN(created_at) FILTER (WHERE status = 'pending') AS oldest_pending_at
FROM processing_queue
GROUP BY stage
"""
# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방)
# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전).
_SUMMARIZE_SPLIT_SQL = """
SELECT
COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'summarize'
AND q.status = 'completed'
AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour')
GROUP BY 1
"""
# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
_TREND_INFLOW_SQL = """
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
'YYYY-MM-DD HH24:00') AS bucket,
COUNT(*) AS n
FROM processing_queue
WHERE stage = 'summarize'
AND created_at > NOW() - INTERVAL '24 hours'
GROUP BY 1
"""
_TREND_DONE_SQL = """
SELECT to_char(date_trunc('hour', completed_at AT TIME ZONE 'Asia/Seoul'),
'YYYY-MM-DD HH24:00') AS bucket,
COUNT(*) AS n
FROM processing_queue
WHERE stage = 'summarize'
AND status = 'completed'
AND completed_at > NOW() - INTERVAL '24 hours'
GROUP BY 1
"""
# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
_CURRENT_SQL = """
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.status = 'processing'
ORDER BY q.started_at DESC NULLS LAST
LIMIT 50
"""
async def build_overview(session: AsyncSession) -> dict:
"""5쿼리 수집 → compose_overview 판정 → 응답 dict."""
now_kst = datetime.now(KST)
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
deep_enabled = settings.ai is not None and settings.ai.deep is not None
stage_rows = (
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
).all()
split_rows = (
await session.execute(
text(_SUMMARIZE_SPLIT_SQL),
{"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS},
)
).all()
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
current_result = (await session.execute(text(_CURRENT_SQL))).all()
current_rows = [
{
"stage": row[0],
"document_id": row[1],
"title": row[2],
"original_filename": row[3],
"file_path": row[4],
}
for row in current_result
]
return compose_overview(
rows_to_stage_stats(stage_rows),
rows_to_summarize_split(split_rows),
{row[0]: int(row[1]) for row in inflow_rows},
{row[0]: int(row[1]) for row in done_rows},
current_rows,
deep_enabled=deep_enabled,
now_kst=now_kst,
)
# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은
# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자).
# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록)
# started_at 을 시각 fallback 으로 쓴다.
_FAILED_LIST_SQL = """
SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts,
q.error_message,
COALESCE(q.completed_at, q.started_at) AS failed_at,
d.title, d.original_filename, d.file_path
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.status = 'failed'
ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST
LIMIT 300
"""
# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여).
# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해.
# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 —
# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고.
_RETRY_SQL = """
UPDATE processing_queue q
SET status = 'pending', attempts = 0,
started_at = NULL, completed_at = NULL
WHERE q.id IN :ids
AND q.status = 'failed'
AND NOT EXISTS (
SELECT 1 FROM processing_queue p
WHERE p.document_id = q.document_id
AND p.stage = q.stage
AND p.status IN ('pending', 'processing')
AND p.id <> q.id
)
RETURNING q.id
"""
# 건너뛰기: failed → completed + payload 마킹 (감사 추적).
# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가
# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로.
_SKIP_SQL = """
UPDATE processing_queue
SET status = 'completed', completed_at = NOW(),
payload = COALESCE(payload, '{}'::jsonb)
|| jsonb_build_object('skipped_by_user', true,
'skipped_at', NOW()::text)
WHERE id IN :ids AND status = 'failed'
RETURNING id
"""
async def fetch_failed_items(session: AsyncSession) -> list[dict]:
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)."""
rows = (await session.execute(text(_FAILED_LIST_SQL))).all()
return [
{
"id": r[0],
"stage": r[1],
"document_id": r[2],
"attempts": int(r[3] or 0),
"max_attempts": int(r[4] or 0),
"error_message": r[5],
"failed_at": r[6],
"title": display_title({
"document_id": r[2],
"title": r[7],
"original_filename": r[8],
"file_path": r[9],
}),
}
for r in rows
]
async def retry_failed(session: AsyncSession, ids: list[int]) -> dict:
"""failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님."""
unique_ids = list(set(ids))
stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True))
retried = (await session.execute(stmt, {"ids": unique_ids})).all()
await session.commit()
return {
"requested": len(unique_ids),
"retried": len(retried),
"not_retried": len(unique_ids) - len(retried),
}
async def skip_failed(session: AsyncSession, ids: list[int]) -> dict:
"""failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음."""
unique_ids = list(set(ids))
stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True))
skipped = (await session.execute(stmt, {"ids": unique_ids})).all()
await session.commit()
return {
"requested": len(unique_ids),
"skipped": len(skipped),
"not_skipped": len(unique_ids) - len(skipped),
}
+34 -14
View File
@@ -26,8 +26,11 @@ PR-MacBook-RAG-Backend-1 부터 `services.llm.QwenMacBookBackend` 는 별 endpoi
- **fallback(Claude Sonnet 4 API) 경로는 gate 제외**. PR #20 이후 fallback = Claude API. 단 현재
구현상 `AIClient._call_chat` 내부에서 primaryfallback 전환이 일어나므로
fallback도 gate 점유 상태로 실행된다. 허용 가능(fallback 빈도 낮음).
- **MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**. 모델이 바뀌어도 single-
inference 특성이 깨지지 않는 값을 올리지 .
- ~~**MLX concurrency는 `MLX_CONCURRENCY = 1` 고정**~~ **2026-06-12 개정**:
룰의 전제(서버 = single-inference) 소멸 mlx_vlm server continuous
batching 으로 동시 스트림 흡수(실측). 상한은 config `pipeline.mlx_gate_concurrency`
(기본 1, 운영 2). **게이트 자체(상한+우선순위 ) 영구 유지** thundering herd
(23 concurrent 22 timeout 사고) 방지는 계속 상한이 담당. 무제한 금지.
## 우선순위 정책 (B-1, 2026-05-17)
@@ -80,8 +83,22 @@ from core.utils import setup_logger
logger = setup_logger("llm_gate")
# MLX primary는 single-inference → 1
MLX_CONCURRENCY = 1
def _capacity() -> int:
"""게이트 동시 실행 상한 — config.yaml `pipeline.mlx_gate_concurrency` (기본 1).
2026-06-12 일반화: "MLX_CONCURRENCY = 1 고정" 영구 룰의 전제( 서버 = single-
inference, 23 concurrent 22 timeout 실측) 소멸 mlx_vlm server
continuous batching 으로 동시 스트림을 흡수(2026-06-11 6~8 concurrent 실측
정상). 게이트 자체(상한 + 우선순위) 유지하고 상한만 config thundering
herd 재발 방지는 상한이 계속 담당한다. 런타임 acquire 조회라
config 변경 + 프로세스 재기동으로 반영, 테스트는 settings monkeypatch.
"""
from core.config import settings
try:
return max(1, int(getattr(settings, "mlx_gate_concurrency", 1)))
except (TypeError, ValueError):
return 1
# Background waiter wait_ms 가 이 값 초과 시 WARN (starvation 신호, aging mitigation 은 Phase 2)
STARVATION_WARN_MS = 300_000 # 5 min
@@ -101,7 +118,7 @@ DEFAULT_PRIORITY: Priority = Priority.BACKGROUND
# Tuple format: (priority: int, seq: int, future: asyncio.Future, enqueue_ts: float)
_waiters: list[tuple[int, int, asyncio.Future, float]] = []
_seq = itertools.count()
_inflight: bool = False
_inflight_n: int = 0 # 동시 실행 수 (구 bool — capacity 일반화로 카운터)
_lock: asyncio.Lock | None = None
@@ -143,7 +160,7 @@ async def acquire_mlx_gate(
`asyncio.timeout` 반드시 gate 안쪽 (Future await ) .
"""
global _inflight, _waiters
global _inflight_n, _waiters
lock = _get_lock()
seq = next(_seq)
@@ -152,9 +169,9 @@ async def acquire_mlx_gate(
fut: asyncio.Future | None = None
async with lock:
if not _inflight and not _waiters:
if _inflight_n < _capacity() and not _waiters:
# fast path — 즉시 inflight 진입, Future 생성 안 함
_inflight = True
_inflight_n += 1
else:
# 대기열 진입
fut = asyncio.get_event_loop().create_future()
@@ -194,8 +211,8 @@ async def acquire_mlx_gate(
async with lock:
next_fut = _dispatch_next_locked()
if next_fut is None:
_inflight = False
# _inflight 는 True 유지 (다음 waiter 가 진입 예정)
_inflight_n = max(0, _inflight_n - 1)
# next_fut 가 있으면 슬롯 handover — 카운트 유지 (다음 waiter 가 진입 예정)
logger.debug(
"mlx_gate release duration_ms=%.0f priority=%s seq=%d",
duration_ms, priority.name, seq,
@@ -226,8 +243,11 @@ def get_mlx_gate():
def gate_status() -> dict:
"""현재 gate 점유 스냅샷 (read-only, lock-free 근사치 — UI 표시용)."""
return {"inflight": _inflight, "waiters": len(_waiters)}
"""현재 gate 점유 스냅샷 (read-only, lock-free 근사치 — UI 표시용).
inflight = 동시 실행 (int). 기존 소비자(eid status) bool() 캐스팅이라 호환.
"""
return {"inflight": _inflight_n, "waiters": len(_waiters)}
# ── Test helpers (conftest reset) ────────────────────────────────────────────
@@ -235,8 +255,8 @@ def gate_status() -> dict:
def _reset_for_test() -> None:
"""테스트 fixture 가 fresh loop 마다 호출. production code 에서 사용 X."""
global _waiters, _inflight, _lock, _seq
global _waiters, _inflight_n, _lock, _seq
_waiters = []
_inflight = False
_inflight_n = 0
_lock = None
_seq = itertools.count()
+65 -1
View File
@@ -63,8 +63,41 @@ CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
"chunks_table": "document_chunks_cand_snowflake_l_v2",
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
},
# ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ───
# embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용,
# G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재.
# qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측).
"cand_qwen06": {
"docs_table": "documents_cand_qwen06",
"chunks_table": "document_chunks_cand_qwen06",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:0.6b",
},
"cand_qwen4": {
"docs_table": "documents_cand_qwen4",
"chunks_table": "document_chunks_cand_qwen4",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
},
"cand_qwen4m": {
"docs_table": "documents_cand_qwen4m",
"chunks_table": "document_chunks_cand_qwen4m",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
"embed_dimensions": 1024,
},
}
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
# 문구 변경 = 저장=조회 불변식 위반과 동급. 쿼리 측 전용, 문서 적재는 plain).
QWEN3_QUERY_INSTRUCT = (
"Instruct: Given a web search query, retrieve relevant passages that answer the query"
"\nQuery: "
)
# 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist.
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
# corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point).
@@ -137,6 +170,34 @@ async def _embed_query_via_tei(endpoint: str, text_: str) -> list[float] | None:
return None
async def _embed_query_via_ollama(cfg: dict, text_: str) -> list[float] | None:
"""Phase 2A 후보 쿼리 임베딩 — Ollama /api/embed + 비대칭 instruct prefix.
쿼리 전용: QWEN3_QUERY_INSTRUCT 선두에 붙인다 (문서 적재 = plain).
embed_dimensions 지정(qwen4m) Ollama dimensions 옵션 = MRL truncate+재정규화
(G-1 fixture: 1024 출력 L2=1.0 실측). cache 미사용 slug 분포 상이.
"""
if not text_:
return None
import httpx
body: dict = {"model": cfg["embed_model"], "input": [QWEN3_QUERY_INSTRUCT + text_]}
if cfg.get("embed_dimensions"):
body["dimensions"] = cfg["embed_dimensions"]
try:
async with httpx.AsyncClient(timeout=60.0) as c:
r = await c.post(cfg["embed_endpoint"], json=body)
r.raise_for_status()
embs = r.json().get("embeddings")
if not isinstance(embs, list) or not embs or not isinstance(embs[0], list):
raise ValueError("unexpected /api/embed shape")
return embs[0]
except Exception as exc:
logger.warning(
"candidate ollama embed failed model=%s err=%r", cfg.get("embed_model"), exc
)
return None
def _query_embed_key(text_: str) -> str:
return hashlib.sha256(f"{text_}|bge-m3".encode("utf-8")).hexdigest()
@@ -323,7 +384,10 @@ async def search_vector(
else:
docs_table = cfg["docs_table"]
chunks_table = cfg["chunks_table"]
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
if cfg.get("embed_kind") == "ollama":
query_embedding = await _embed_query_via_ollama(cfg, query)
else:
query_embedding = await _embed_query_via_tei(cfg["embed_endpoint"], query)
logger.info(
"[embedding-dispatch] backend=%s docs_table=%s chunks_table=%s snapshot_doc_id_max=%s "
+1 -1
View File
@@ -47,7 +47,7 @@ logger = setup_logger("synthesis")
# ─── 상수 (plan 영구 룰) ─────────────────────────────────
PROMPT_VERSION = "v2"
LLM_TIMEOUT_MS = 30000 # 2026-05-17 B-3: 15s 시 동시 부하 (Mac mini 26B classifier+evidence+synthesis serialized) 빈발 timeout — classifier (30s) 와 align
LLM_TIMEOUT_MS = 120000 # 2026-06-11 Qwen3.6-27B-6bit 전환: 프리필 ~112 tok/s·디코드 ~11.7 tok/s 실측 — 30s 면 synthesis(답변 본체) 상시 timeout. synthesis 는 graceful skip 불가(=답변 실패)라 단독 상향, config ask.backend.timeout_read_s=120 와 align
CACHE_TTL = 3600 # 1h (answer 는 원문 변경에 민감 → query_analyzer 24h 보다 짧게)
CACHE_MAXSIZE = 300
MAX_ANSWER_CHARS = 600
+4
View File
@@ -8,6 +8,7 @@
import asyncio
from datetime import date
from core.config import settings
from core.utils import setup_logger
from services.briefing.pipeline import run_briefing_pipeline
@@ -22,6 +23,9 @@ async def run(target_date: date | None = None) -> dict | None:
Args:
target_date: KST 기준 briefing_date (None = 오늘). API regenerate 명시 지정 가능.
"""
if "briefing" in settings.pipeline_held_stages:
logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip")
return None
try:
result = await asyncio.wait_for(
run_briefing_pipeline(target_date),
+63 -14
View File
@@ -31,12 +31,18 @@ from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response, strip_thinking
from ai.client import (
AIClient,
call_deep_or_defer,
is_deferrable_error,
parse_json_response,
strip_thinking,
)
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import enqueue_stage
from models.queue import StageDeferred, enqueue_stage
from policy.prompt_render import render_4b, policy_version as compute_policy_version
from policy.routing import decide_routing
from services.document_telemetry import record_analyze_event
@@ -345,13 +351,20 @@ _FRONTMATTER_PRESERVED_KEYS = {
# ───────────────────────── main process ────────────────────────────────
async def process(document_id: int, session: AsyncSession) -> None:
async def process(
document_id: int, session: AsyncSession, *, use_deep: bool = False
) -> None:
"""문서 분류 + 요약 + tier triage.
1) Legacy: classify() ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion
2) Legacy: summarize() ai_summary
3) PR-B B-1: summary_triage (4B) ai_tldr/ai_bullets/ai_analysis_tier='triage'
use_deep (2026-06-12 fair-share, queue_drain 전용): triage LLM 호출을 deep 슬롯
(맥북, 라우터 경유)으로 보낸다 sampling triage temperature/max_tokens
유지(분류 결정성), endpoint 교체. 맥북 불가 = StageDeferred 전파(drain
보류 처리). False(기본/consumer) = 기존 call_triage(맥미니 직접) 그대로.
예외 source_channel='law_monitor':
법령은 외부 source-of-truth (law.go.kr) 보유 + immutable + 자동 재수집.
AI 분류는 무가치 + 본문 해석 환각 위험. 26B legacy + 4B triage 전부 skip.
@@ -446,10 +459,20 @@ async def process(document_id: int, session: AsyncSession) -> None:
logger.info(f"doc {document_id}: frontmatter 부분 인식 → LLM 으로 미설정 필드 보완")
client = AIClient()
# fair-share (2026-06-12): use_deep 시 legacy classify/summarize 도 deep 슬롯(맥북)
# 경유 — 그래야 drain 의 "맥북 분담" 이 실제로 성립 (triage 만 보내면 50K 요약
# 프리필이 맥미니에 남는다). deep 슬롯 sampling = primary 와 동일(0.3/0.9/8192).
legacy_cfg = settings.ai.deep if (use_deep and settings.ai.deep is not None) else None
try:
# ─── 1. Legacy classify (primary 26B) ───
# ─── 1. Legacy classify (primary 또는 deep) ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated)
try:
raw_response = await client.classify(truncated, cfg=legacy_cfg)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
# 맥북 불가 — 첫 호출(최저 비용 지점)에서 보류로 전환, doc 쓰기 0
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
parsed = parse_json_response(raw_response)
if not parsed:
@@ -517,12 +540,17 @@ async def process(document_id: int, session: AsyncSession) -> None:
"reason": "classify pipeline",
}
# ─── 2. Legacy 요약 (primary 26B) ───
summary = await client.summarize(doc.extracted_text[:50000])
# ─── 2. Legacy 요약 (primary 또는 deep) ───
try:
summary = await client.summarize(doc.extracted_text[:50000], cfg=legacy_cfg)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 (legacy 완료) ───
doc.ai_model_version = settings.ai.primary.model
# ─── 메타데이터 (legacy 완료) — 실제 처리 머신 귀속 (drain=qwen-macbook) ───
doc.ai_model_version = (legacy_cfg or settings.ai.primary).model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
@@ -533,7 +561,9 @@ async def process(document_id: int, session: AsyncSession) -> None:
# ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ───
try:
await _run_tier_triage(client, doc, session)
await _run_tier_triage(client, doc, session, use_deep=use_deep)
except StageDeferred:
raise # 보류는 실패가 아님 — drain/consumer 가 attempts 미소모 처리
except Exception as exc:
logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}")
@@ -541,8 +571,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
await client.close()
async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None:
"""summary_triage (p3a_short_summary) 경로."""
async def _run_tier_triage(
client: AIClient, doc: Document, session: AsyncSession, *, use_deep: bool = False
) -> None:
"""summary_triage (p3a_short_summary) 경로. use_deep = process() 에서 전달 (drain 전용)."""
document_id = doc.id
text = doc.extracted_text or ""
input_chars = len(text)
@@ -550,6 +582,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
triage_start = time.perf_counter()
parse_error: str | None = None
triage_out = TriageOutput()
# drain 경유 시 triage 도 deep 슬롯(맥북) — sampling 은 triage 것 유지(결정성).
deep_triage_cfg = None
if use_deep and settings.ai.deep is not None:
deep_triage_cfg = settings.ai.deep.model_copy(update={
"temperature": settings.ai.triage.temperature,
"top_p": settings.ai.triage.top_p,
"max_tokens": settings.ai.triage.max_tokens,
})
# 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate
if input_chars > TRIAGE_TEXT_LIMIT:
@@ -590,7 +630,14 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT])
try:
raw_triage = await client.call_triage(prompt)
if deep_triage_cfg is not None:
# drain 전용 — deep 슬롯 endpoint + triage sampling. 맥북 불가(StageDeferred)
# 는 아래 generic except 에 먹히지 않게 먼저 전파.
raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg)
else:
raw_triage = await client.call_triage(prompt)
except StageDeferred:
raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전)
except Exception as exc:
logger.warning(
"[triage] 4B 호출 실패 id=%s type=%s repr=%r",
@@ -656,6 +703,7 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
escalation_reason=escalation_reason,
parse_error=parse_error,
routing_decision=routing_decision,
model_name=(deep_triage_cfg.model if deep_triage_cfg is not None else None),
)
@@ -670,6 +718,7 @@ async def _apply_triage_result(
escalation_reason: str | None,
parse_error: str | None,
routing_decision=None,
model_name: str | None = None, # fair-share: 실제 호출 경로 모델 (None=triage 기본)
) -> None:
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.
@@ -760,7 +809,7 @@ async def _apply_triage_result(
layers_returned=["tldr", "bullets"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.triage.model,
model_name=(model_name or settings.ai.triage.model),
prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK),
error_code=parse_error,
source="document_server",
+43 -9
View File
@@ -20,12 +20,12 @@ from sqlalchemy.ext.asyncio import AsyncSession
import json
import re
from ai.client import AIClient, parse_json_response, strip_thinking
from ai.client import AIClient, call_deep_or_defer, parse_json_response, strip_thinking
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue
from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate
@@ -54,8 +54,18 @@ class DeepSummaryOutput(BaseModel):
confidence: float = 0.5
async def process(document_id: int, session: AsyncSession) -> None:
"""deep_summary 큐 pickup → 26B 호출 → 필드 저장."""
async def process(
document_id: int, session: AsyncSession, *, defer_on_deep_unavailable: bool = False
) -> None:
"""deep_summary 큐 pickup → LLM 호출 → 필드 저장.
defer_on_deep_unavailable:
False (기본, consumer 경로) = 맥북(deep 슬롯) 우선 시도, 불가 즉시
맥미니 primary 처리. 2026-06-12 fair-share: 머신이 동일 모델
(Qwen3.6-27B-6bit)이라 폴백 = 품질 강등이 아니라 단순 분배.
True (queue_drain 전용) = 맥북 불가를 StageDeferred 올려 drain
보류 run 멈춘다 (drain = 맥북 분담 전용 레버 시멘틱 유지).
"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"deep_summary: document id={document_id} 없음")
@@ -101,17 +111,40 @@ async def process(document_id: int, session: AsyncSession) -> None:
)
client = AIClient()
# ds-macbook-offload-1: deep 슬롯 구성 시 맥북 M5 Max 경유(라우터). 부재 시 기존 경로 그대로.
deep_cfg = client.ai.deep
used_cfg = deep_cfg or settings.ai.primary
latency_ms = 0
parse_error: str | None = None
deep_out = DeepSummaryOutput()
try:
start = time.perf_counter()
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
raw = await client.call_primary(prompt)
if deep_cfg is not None:
# 맥북 우선 — 맥미니 mlx gate 미점유(별 endpoint). doc 쓰기는 완주+파싱
# 후에만 일어나므로 어느 시점에 끊겨도 부분 쓰기 0.
try:
raw = await call_deep_or_defer(client, prompt)
except StageDeferred:
if defer_on_deep_unavailable:
raise # drain 전용 — 맥북 레버 시멘틱 (보류 후 run 종료)
# consumer 경로: 동일 모델이라 강등 아님 — 맥미니가 즉시 처리 (2026-06-12)
logger.info(
f"[deep] id={document_id} 맥북 불가 → 맥미니 primary 처리 (fair-share)"
)
used_cfg = settings.ai.primary
async with acquire_mlx_gate(Priority.BACKGROUND):
raw = await client.call_primary(prompt)
else:
async with acquire_mlx_gate(Priority.BACKGROUND): # 2026-05-17 B-1: classify-escalate worker
raw = await client.call_primary(prompt)
latency_ms = int((time.perf_counter() - start) * 1000)
except StageDeferred:
# 보류는 실패가 아님 — analyze_event 미기록(가짜 완료 방지), drain 이 백오프 기록.
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
raise
except Exception as exc:
logger.warning(f"[deep] 26B 호출 실패 id={document_id}: {exc}")
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
parse_error = "call_failed"
raw = ""
finally:
@@ -147,12 +180,13 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc_id=document_id,
user_id=None,
mode="summary_deep",
text_limit=settings.ai.primary.context_char_limit or 260000,
text_limit=used_cfg.context_char_limit or 260000,
truncated=False,
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.primary.model,
# deep 슬롯 사용 시 실처리 모델(qwen-macbook alias) 기록 — 어느 머신이 처리했는지 추적
model_name=used_cfg.model,
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
error_code=parse_error,
source="document_server",
+4
View File
@@ -10,6 +10,7 @@ global_digests / digest_topics 테이블에 저장한다.
import asyncio
from core.config import settings
from core.utils import setup_logger
from services.digest.pipeline import run_digest_pipeline
@@ -24,6 +25,9 @@ async def run() -> None:
pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout summarizer 처리).
여기서는 전체 hard cap 강제.
"""
if "digest" in settings.pipeline_held_stages:
logger.info("[global_digest] 보류 (pipeline.held_stages) — 이번 실행 skip")
return
try:
result = await asyncio.wait_for(
run_digest_pipeline(),
+142
View File
@@ -0,0 +1,142 @@
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
설계 원칙 (plan r3):
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) 중단/재실행 이어서.
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
cand 테이블은 동결 범위로만 INSERT (retrieval cand path snapshot filter 타는 전제).
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
chunk_worker [제목][섹션][본문]) + plain (instruct prefix 쿼리 전용 G-1 불변식).
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
- qwen4m CLI 대상이 아님 qwen4 적재 SQL 파생(subvector+l2_normalize), plan E-1.
"""
import argparse
import asyncio
import hashlib
import time
import httpx
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from workers.embed_worker import _build_embed_input
logger = setup_logger("phase2a_cand_backfill")
OLLAMA_EMBED = "http://ollama:11434/api/embed"
TARGETS = {
"qwen06": {
"model": "qwen3-embedding:0.6b", "dim": 1024,
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
},
"qwen4": {
"model": "qwen3-embedding:4b", "dim": 2560,
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
},
}
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
r.raise_for_status()
embs = r.json()["embeddings"]
if len(embs) != len(texts):
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
return embs
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT d.id FROM documents d
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
ORDER BY d.id LIMIT :b
"""), {"m": doc_id_max, "b": batch})).scalars().all()
if not rows:
break
docs = [(await session.get(Document, i)) for i in rows]
inputs = [_build_embed_input(d) for d in docs]
embs = await _embed_batch(http, target["model"], inputs)
for d, inp, e in zip(docs, inputs, embs):
await session.execute(text(f"""
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
VALUES (:i, :h, cast(:e AS vector))
ON CONFLICT (doc_id) DO NOTHING
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
return total
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
ORDER BY c.id LIMIT :b
"""), {"m": chunk_id_max, "b": batch})).all()
if not rows:
break
inputs = [
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
for r in rows
]
embs = await _embed_batch(http, target["model"], inputs)
for r, e in zip(rows, embs):
await session.execute(text(f"""
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
ON CONFLICT (id) DO NOTHING
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
"s": r.section_title, "t": r.text, "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
return total
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
target = TARGETS[target_key]
start = time.monotonic()
async with httpx.AsyncClient() as http:
nd = await backfill_docs(target, doc_id_max, batch, http)
nc = await backfill_chunks(target, chunk_id_max, batch, http)
mins = (time.monotonic() - start) / 60
async with async_session() as session:
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
logger.info(
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
)
def main() -> None:
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
p.add_argument("--target", required=True, choices=sorted(TARGETS))
p.add_argument("--doc-id-max", type=int, required=True)
p.add_argument("--chunk-id-max", type=int, required=True)
p.add_argument("--batch", type=int, default=32)
a = p.parse_args()
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
if __name__ == "__main__":
main()
+69 -5
View File
@@ -13,18 +13,25 @@ from sqlalchemy import select, update, delete, exists
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import aliased
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, enqueue_stage
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
logger = setup_logger("queue_consumer")
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
_hold_logged = False
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
@@ -34,14 +41,21 @@ STALE_THRESHOLD_MINUTES = 10
# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다.
MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"))
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue 로 분리.
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue,
# embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize", "embed", "chunk",
"extract", "classify", "summarize",
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비.
# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다
# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴.
FAST_QUEUE_STAGES = ["embed", "chunk"]
async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES):
"""processing 상태로 오래 방치된 항목 복구 (지정 stage 한정)
@@ -216,13 +230,14 @@ async def _process_stage(stage, worker_fn):
"""
batch_size = BATCH_SIZE.get(stage, 3)
# pending 항목 조회
# pending 항목 조회 (보류 백오프 deferred_until 미래 항목 제외 — ds-macbook-offload-1)
async with async_session() as session:
result = await session.execute(
select(ProcessingQueue.id, ProcessingQueue.document_id)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
not_deferred_condition(),
)
.order_by(ProcessingQueue.created_at)
.limit(batch_size)
@@ -276,6 +291,26 @@ async def _process_stage(stage, worker_fn):
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except StageDeferred as defer:
# 보류 (ds-macbook-offload-1): 맥북 일시 불가(sleep/cold/editor_busy) — 실패 아님.
# attempts 는 claim 시 선증가분을 반환(미소모)하고 deferred_until 백오프 후 자연 재개.
# 워커는 완주 전 doc 쓰기를 하지 않으므로 이 시점의 데이터 변경 = 0 (sleep-안전).
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
continue
item.status = "pending"
item.started_at = None
item.attempts = max(0, item.attempts - 1)
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
await session.commit()
logger.info(
f"[{stage}] document_id={document_id} 보류({defer}) — "
f"{defer.retry_after_minutes}분 후 재개"
)
except Exception as e:
# 실패 처리
async with async_session() as session:
@@ -314,14 +349,43 @@ async def _process_stage(stage, worker_fn):
async def consume_queue():
"""메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리."""
global _hold_logged
workers = _load_workers()
held = [s for s in MAIN_QUEUE_STAGES if s in settings.pipeline_held_stages]
if held and not _hold_logged:
logger.info(f"pipeline.held_stages 보류 중: {held} — claim 하지 않음 (pending 적체 = 의도)")
_hold_logged = True
try:
await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("stale reset failed, but continuing queue consumption")
for stage in MAIN_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
async def consume_fast_queue():
"""embed/chunk 전용 고속 소비자 — LLM 사이클과 완전 디커플 (2026-06-12).
main 루프는 classify/summarize/deep 사이클을 단위로 점유해 건당 <1s 짜리
embed/chunk 사이클당 1번씩만 기회를 얻었다 (실효 ~60/ = 적체 원인).
분리 = 1 × 배치 10 ~600/. APScheduler max_instances=1 이라
배치가 1분을 넘으면 다음 fire coalesce (폭주 방지).
"""
workers = _load_workers()
try:
await reset_stale_items(FAST_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("fast stale reset failed, but continuing queue consumption")
for stage in FAST_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
+195
View File
@@ -0,0 +1,195 @@
"""수동 burst-drain CLI — 맥미니 백로그를 사용자가 의도적으로 맥북(M5 Max)으로 소화.
ds-macbook-offload-1 P2-3. 운영 패턴 = csb_collector --bulk 동일 (컨테이너 실행,
장기 배치 fastapi 재생성 = in-flight 절단이지만 멱등 재실행으로 무손실).
docker compose exec fastapi python -m workers.queue_drain --stage summarize --limit 200
설계 원칙:
- deep 슬롯(config.yaml ai.models.deep) 필수 부재 명시 종료 (silent 강등 금지)
- claim = FOR UPDATE SKIP LOCKED 단건 전이 consumer(1 주기) 이중처리 0
- per-item 커밋 = sleep-안전: 중단돼도 완료분 무손상, 진행 1건만 stale recovery
(10) pending 복귀. 재실행 멱등 (summarize ai_summary 존재 skip)
- 보류(StageDeferred = 맥북 sleep/cold/editor_busy/네트워크 플랩): attempts 반환 +
deferred_until 백오프 기록. 연속 보류 --defer-retries(기본 5)회까지 --defer-wait
(기본 120s) 간격 재시도( 단위 플랩 흡수), 한도 도달 = sleep 판정으로 run 종료
불가 상태의 맥북을 계속 두드리지 않는다
- 폴백 0: 맥미니/cloud 강등 없음
"""
import argparse
import asyncio
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition
logger = setup_logger("queue_drain")
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 / classify = triage 분류.
# classify 는 2026-06-12 fair-share 로 합류 — 구 제외 사유(plan Q-4 "triage 경량 = 맥미니
# 적합")는 Gemma a4b(42 tok/s) 전제. Qwen 27B 전환 후 classify 가 장문 프리필로 컨슈머
# 사이클을 점유하는 최대 병목이라, 맥북(프리필 ~5배)이 가장 효과적인 분담처다.
# classify 완료 시 enqueue_next_stage(embed/chunk/markdown) 필수 — 누락 = DAG 단절.
DRAIN_STAGES = ("summarize", "deep_summary", "classify")
async def _claim_one(stage: str) -> tuple[int, int] | None:
"""pending 1건을 processing 으로 원자 전이 (SKIP LOCKED — consumer 와 경합 안전)."""
async with async_session() as session:
item = (await session.execute(
select(ProcessingQueue)
.where(
ProcessingQueue.stage == stage,
ProcessingQueue.status == "pending",
not_deferred_condition(),
)
.order_by(ProcessingQueue.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)).scalar_one_or_none()
if item is None:
return None
item.status = "processing"
item.started_at = datetime.now(timezone.utc)
item.attempts += 1
claimed = (item.id, item.document_id)
await session.commit()
return claimed
async def _mark_completed(queue_id: int) -> None:
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if item:
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await session.commit()
async def _mark_deferred(queue_id: int, defer: StageDeferred) -> None:
"""보류: attempts 반환(미소모) + deferred_until 백오프 — consumer 의 처리와 동형."""
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if item:
item.status = "pending"
item.started_at = None
item.attempts = max(0, item.attempts - 1)
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
await session.commit()
async def _mark_failed(queue_id: int, exc: Exception) -> None:
"""실패: consumer 와 동일 재시도 정책 (attempts >= max → failed, 아니면 pending 복귀)."""
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if item:
err_text = str(exc) or repr(exc) or type(exc).__name__
item.error_message = err_text[:500]
if item.attempts >= item.max_attempts:
item.status = "failed"
else:
item.status = "pending"
item.started_at = None
await session.commit()
async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None:
if stage not in DRAIN_STAGES:
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용")
if settings.ai.deep is None:
raise SystemExit(
"config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음"
" (맥미니로의 silent 강등 금지)"
)
from workers.classify_worker import process as classify_process
from workers.deep_summary_worker import process as deep_summary_process
from workers.queue_consumer import enqueue_next_stage
from workers.summarize_worker import process as summarize_process
done = failed = 0
deferred = False
consecutive_defers = 0
while done + failed < limit:
claimed = await _claim_one(stage)
if claimed is None:
logger.info(f"[drain:{stage}] pending 소진 — 종료")
break
queue_id, document_id = claimed
try:
async with async_session() as worker_session:
if stage == "summarize":
await summarize_process(document_id, worker_session, use_deep=True)
elif stage == "classify":
await classify_process(document_id, worker_session, use_deep=True)
else:
# deep_summary: drain 은 맥북 전용 레버 — 불가 시 보류(폴백은 consumer 만)
await deep_summary_process(
document_id, worker_session, defer_on_deep_unavailable=True
)
await worker_session.commit()
await _mark_completed(queue_id)
# 다음 stage 연쇄 — classify 는 embed/chunk/markdown enqueue (consumer 와 동형,
# summarize/deep_summary 는 next_stages 미등록이라 no-op)
await enqueue_next_stage(document_id, stage)
done += 1
consecutive_defers = 0
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
except StageDeferred as defer:
# 일시 불가는 종류가 둘: 진짜 sleep(장시간) vs 일시 네트워크 플랩(분 단위 —
# 2026-06-11 실측: Tailscale direct 경로 ~10분 플랩으로 32/300 조기 종료).
# 연속 보류 한도까지 대기 후 재시도해 플랩을 흡수, 한도 도달 시 종료(sleep 판정).
await _mark_deferred(queue_id, defer)
consecutive_defers += 1
if consecutive_defers >= defer_retries:
deferred = True
logger.warning(
f"[drain:{stage}] doc={document_id} 맥북 불가({defer}) — 연속 보류 "
f"{consecutive_defers}회 한도 도달, run 종료. 맥북 깨운 뒤(또는 "
f"{defer.retry_after_minutes}분 후) 재실행"
)
break
logger.warning(
f"[drain:{stage}] doc={document_id} 맥북 일시 불가({defer}) — "
f"{defer_wait}s 대기 후 재시도 ({consecutive_defers}/{defer_retries})"
)
await asyncio.sleep(defer_wait)
except Exception as exc:
await _mark_failed(queue_id, exc)
failed += 1
logger.error(f"[drain:{stage}] doc={document_id} 실패: {exc}")
# 종료 요약 (잔여 = 지금 시점 pending 수)
async with async_session() as session:
from sqlalchemy import func as sa_func
remaining = (await session.execute(
select(sa_func.count()).select_from(ProcessingQueue).where(
ProcessingQueue.stage == stage, ProcessingQueue.status == "pending",
)
)).scalar_one()
logger.info(
f"[drain:{stage}] 요약 — 완료 {done} · 실패 {failed} · "
f"보류종료 {'' if deferred else '아니오'} · 잔여 pending {remaining}"
)
def main() -> None:
parser = argparse.ArgumentParser(description="맥북(M5 Max) burst-drain — 수동 백로그 분담 레버")
parser.add_argument("--stage", required=True, choices=DRAIN_STAGES)
parser.add_argument("--limit", type=int, default=50, help="이번 run 최대 처리 건수 (기본 50)")
parser.add_argument("--defer-retries", type=int, default=5,
help="연속 보류 허용 횟수 — 네트워크 플랩 흡수 (기본 5, 한도 도달 시 종료)")
parser.add_argument("--defer-wait", type=int, default=120,
help="보류 재시도 간 대기 초 (기본 120)")
args = parser.parse_args()
asyncio.run(drain(args.stage, args.limit, args.defer_retries, args.defer_wait))
if __name__ == "__main__":
main()
@@ -14,6 +14,7 @@ from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.study_memo_card_job import StudyMemoCardJob
@@ -50,6 +51,10 @@ async def reset_stale_card_jobs() -> None:
async def consume_study_memo_card_queue() -> None:
"""APScheduler 진입점. pending card_extract job 을 BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
if "study_memo_card" in settings.pipeline_held_stages:
logger.debug("study_memo_card 보류 (pipeline.held_stages)")
return
await reset_stale_card_jobs()
async with async_session() as session:
+5
View File
@@ -59,6 +59,11 @@ async def reset_stale_study_jobs() -> None:
async def consume_study_queue() -> None:
"""APScheduler 진입점. pending job BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: env(study_explanation_enabled) 와 별개의 self-contained 게이트.
# pending 은 그대로 유지 (Mac mini derived-worker 흡수 경로도 본 게이트와 무관).
if "study_explanation" in settings.pipeline_held_stages:
logger.debug("study_explanation 보류 (pipeline.held_stages)")
return
await reset_stale_study_jobs()
async with async_session() as session:
@@ -12,6 +12,7 @@ from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.study_quiz_session_job import StudyQuizSessionJob
@@ -48,6 +49,10 @@ async def reset_stale_session_jobs() -> None:
async def consume_study_session_queue() -> None:
"""APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
if "study_session_analysis" in settings.pipeline_held_stages:
logger.debug("study_session_analysis 보류 (pipeline.held_stages)")
return
await reset_stale_session_jobs()
async with async_session() as session:
+35 -7
View File
@@ -2,27 +2,37 @@
P3 of family-adaptive-bengio (2026-05-23): 50k 초과 input sliding window
(cumulative carry-over) 분할 처리. 50k 이하 input 기존 동작 유지.
ds-macbook-offload-1: use_deep=True (queue_drain 전용) 맥북 M5 Max deep 슬롯으로
호출 맥미니 백로그를 사용자가 의도적으로 분담시키는 수동 레버. 기본(consumer) 경로는
use_deep=False 기존 동작 그대로. 맥북 불가 StageDeferred (강등 0, 부분 쓰기 0).
"""
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, strip_thinking
from ai.client import AIClient, call_deep_or_defer, strip_thinking
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("summarize_worker")
CHUNK_SIZE = 50000
# client.summarize() 의 단일 프롬프트와 동일 문구 — deep 경로가 같은 과업을 수행하도록 고정
SUMMARY_PROMPT_SINGLE = "다음 문서를 500자 이내로 요약해주세요:\n\n{text}"
SUMMARY_PROMPT_CONTINUATION = (
"이전 부분 요약:\n{prior}\n\n다음 부분:\n{text}\n\n"
"위 두 정보를 합쳐 전체 문서를 500자 이내로 요약해주세요."
)
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 AI 요약 생성 (분류 없이 요약만)"""
async def process(document_id: int, session: AsyncSession, *, use_deep: bool = False) -> None:
"""문서 AI 요약 생성 (분류 없이 요약만).
use_deep: queue_drain 전용 deep 슬롯(맥북) 경유. 슬롯 미구성 명시 에러
(silent 강등 금지). consumer 기본 경로는 False (기존 동작 무변경).
"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
@@ -35,13 +45,29 @@ async def process(document_id: int, session: AsyncSession) -> None:
return
client = AIClient()
if use_deep and client.ai.deep is None:
await client.close()
raise ValueError("use_deep=True 인데 config.yaml ai.models.deep 슬롯 미구성 — silent 강등 금지")
used_cfg = client.ai.deep if use_deep else client.ai.primary
async def _summarize_first(text_part: str) -> str:
if use_deep:
return await call_deep_or_defer(client, SUMMARY_PROMPT_SINGLE.format(text=text_part))
return await client.summarize(text_part)
async def _summarize_continuation(prompt: str) -> str:
if use_deep:
return await call_deep_or_defer(client, prompt)
return await client.call_primary(prompt)
try:
text = doc.extracted_text
total_chars = len(text)
if total_chars <= CHUNK_SIZE:
summary = await client.summarize(text)
summary = await _summarize_first(text)
logger.info(
f"[요약] document_id={document_id}: single chunk ({total_chars}자)"
+ (" via deep(맥북)" if use_deep else "")
)
else:
chunks = [text[i:i + CHUNK_SIZE] for i in range(0, total_chars, CHUNK_SIZE)]
@@ -52,10 +78,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
carry = ""
for idx, chunk in enumerate(chunks):
if idx == 0:
partial = await client.summarize(chunk)
partial = await _summarize_first(chunk)
else:
prompt = SUMMARY_PROMPT_CONTINUATION.format(prior=carry, text=chunk)
partial = await client.call_primary(prompt)
partial = await _summarize_continuation(prompt)
carry = strip_thinking(partial)
logger.info(
f"[요약] document_id={document_id}: chunk {idx + 1}/{len(chunks)} done "
@@ -63,8 +89,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
)
summary = carry
# sleep-안전 불변식: 쓰기는 전체 완주 후에만 — 중간 절단은 StageDeferred 로 빠져
# 이 지점에 도달하지 않는다 (carry 는 로컬 변수, doc 무변경).
doc.ai_summary = strip_thinking(summary)
doc.ai_model_version = client.ai.primary.model
doc.ai_model_version = used_cfg.model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
f"[요약] document_id={document_id}: {len(doc.ai_summary)}자 final"
+40 -12
View File
@@ -6,25 +6,40 @@ ai:
models:
# ─── 단일 generation 호스트 routing (2026-05-14 GPU LLM 제거) ───
# GPU Ollama gemma4:e4b-it-q8_0 제거. Mac mini 26B-A4B 가 triage + primary + classifier 모두 흡수.
# fallback 은 Claude Sonnet 4 API (Mac mini 다운 시 자동 trigger, premium 과 budget 공유).
# plan: ~/.claude/plans/rosy-launching-otter.md §C/§D/§E
# 2026-06-11 B안: 맥미니 모델 = Gemma 26B-A4B → Qwen3.6-27B-6bit 풀교체 (사용자 결정).
# dense 27B 라 디코드 ~13 tok/s 급 (a4b ~42 대비 감속) → timeout 상향 (triage 30→120, primary 180→300).
# fallback 은 Claude Sonnet 4 API (CLAUDE_API_KEY 미주입 = 비활성).
# plan: ~/.claude/plans/rosy-launching-otter.md §C/§D/§E + project_macmini_model_decision
# triage: 상시 분류·요약·근거 선별. Mac mini 26B (primary 와 동일 endpoint, 짧은 max_tokens).
# triage: 상시 분류·요약·근거 선별. Mac mini Qwen 27B (primary 와 동일 endpoint, 짧은 max_tokens).
triage:
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
model: "mlx-community/Qwen3.6-27B-6bit"
max_tokens: 4096
timeout: 30
timeout: 480 # 프리필 실측 ~112 tok/s — 120K자 장문 커버 (2026-06-11)
context_char_limit: 120000
temperature: 0.0
# primary: 에스컬레이션 전용. 26B MLX (맥미니 Semaphore(1) 보호 대상).
# primary: 에스컬레이션 전용. Qwen 27B MLX (맥미니 Semaphore(1) 보호 대상).
primary:
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
model: "mlx-community/Qwen3.6-27B-6bit"
max_tokens: 8192
timeout: 180
timeout: 900 # 프리필 실측 ~112 tok/s — 260K자 상한 장문 커버 (2026-06-11)
context_char_limit: 260000
temperature: 0.3
top_p: 0.9
# deep: 야간 night-drain 전용 — 맥북 M5 Max Qwen3.6-27B-6bit (llm-router :8890 경유,
# model=qwen-macbook alias). 2026-06-11 재도입 (사용자: 자기 전 night-drain 으로 백로그 분담).
# 맥북 불가(503/연결/절단) = StageDeferred 보류 — 맥미니/cloud 강등 없음, attempts 미소모.
# consumer 의 deep_summary 도 슬롯 존재 시 맥북 경유 (잠들어 있으면 30분 백오프 보류 = 무해).
# 슬롯 제거 시 deep_summary 는 primary(맥미니) 경로 복귀.
deep:
endpoint: "http://100.76.254.116:8890/v1/chat/completions"
model: "qwen-macbook"
max_tokens: 8192
timeout: 900
context_char_limit: 260000
temperature: 0.3
top_p: 0.9
@@ -58,9 +73,9 @@ ai:
# classifier_service 가 hasattr 체크로 optional 이므로 이 섹션 제거 시 classifier gate 는 자동 skip (score-only).
classifier:
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/gemma-4-26b-a4b-it-8bit"
model: "mlx-community/Qwen3.6-27B-6bit" # 2026-06-11 B안 동승 — gemma id 잔존 시 mlx 서버가 Gemma 를 재로드(이중 적재) 위험
max_tokens: 512
timeout: 30 # 2026-05-17: 15s 도 동시 부하 시 elapsed 14.4s 직전이라 tight — 30s 로 2x 마진 (Mac mini 26B concurrent load). classifier_service.LLM_TIMEOUT_MS=30000 와 align
timeout: 30 # 2026-05-17: 15s 도 동시 부하 시 elapsed 14.4s 직전이라 tight — 30s 로 2x 마진. classifier_service.LLM_TIMEOUT_MS=30000 와 align (초과 = score-only skip, graceful)
# 제거: vision (미사용)
# ─── deep_summary enqueue 폭발 억제 (B-1 R2) ───
@@ -84,7 +99,7 @@ search:
macbook_url: "http://100.118.112.84:8810" # MacBook M5 Max Tailscale interface bind
macbook_model: "mlx-community/Qwen3.6-27B-8bit"
timeout_connect_s: 1 # MacBook sleep/wake 빠른 감지 (자동 fallback 부재 → 빠른 503)
timeout_read_s: 30 # synthesis_service.LLM_TIMEOUT_MS=30000 와 align
timeout_read_s: 120 # 2026-06-11 Qwen 27B(디코드 ~11.7 tok/s) — synthesis_service.LLM_TIMEOUT_MS=120000 와 align
# PR-DocSrv-Ask-ToolCalling-ReAct-1: /api/search/ask/react ReAct loop (qwen-macbook only)
react:
enabled: true
@@ -176,3 +191,16 @@ schedule:
daily_digest: "20:00"
file_watcher_interval_minutes: 5
queue_consumer_interval_minutes: 10
# 생성 LLM 홀드 게이트 (2026-06-11 신설): held_stages 에 든 이름의 컨슈머/워커는 claim 자체를
# 하지 않는다 (attempts 미소모, pending 적체). 유효 키 8 = classify/summarize/deep_summary(큐) +
# digest/briefing(cron) + study_explanation/study_session_analysis/study_memo_card(컨슈머).
# 그 외 문자열은 무동작(오타 주의). 적용/해제 = 리스트 수정 후 fastapi 재기동.
# 이력: 2026-06-11 맥미니 모델 확정까지 8키 홀드 → 同日 Qwen3.6-27B-6bit 전환과 함께 해제([]).
pipeline:
held_stages: []
# mlx gate 동시 실행 상한 (2026-06-12 fair-share): 구 "1 고정" 룰의 전제(single-inference
# 서버)가 소멸 — 현 mlx_vlm 은 continuous batching (2026-06-11 밤 6~8 concurrent 실측 정상).
# 2 = 워커 LLM 호출과 인터랙티브(ask/eid)가 서로 안 막힘 + 집계 throughput ~1.8배.
# 게이트(상한+우선순위)는 유지 — thundering herd 방지. 1 로 되돌리면 구 동작.
mlx_gate_concurrency: 2
@@ -0,0 +1,419 @@
<script lang="ts">
// 처리 머신 보드 v2 — 파이프라인 흐름 뷰 (plan ds-board-engines-1, R2 통합안).
// 메인 = 좌→우 흐름 노드(병목 amber·실패 뱃지), 노드 클릭 = 상세 패널(안1 변형),
// 실패 뱃지 클릭 = 실패 처리 드로어 (재시도/건너뛰기 — 영구 실패의 유일한 조치 경로).
// 데이터 = GET /api/queue/overview (60s 폴링 store) + GET /api/queue/failed (드로어 열 때).
import { api } from '$lib/api';
import { refreshQueueOverview } from '$lib/stores/queueOverview';
import { addToast } from '$lib/stores/toast';
import {
AUX_NODES,
FLOW_NODES,
MACHINE_META,
type FlowNodeDef,
etaShort,
flowStageLabel,
formatAgeSec,
formatRate,
} from '$lib/utils/queueDisplay';
import type {
FailedItem,
FailedListResponse,
MachineCurrentItem,
QueueOverview,
QueueStageRow,
RetryResponse,
SkipResponse,
} from '$lib/types/queue';
let { overview }: { overview: QueueOverview } = $props();
// ─── 노드 통계 합성 ───
interface NodeStats {
def: FlowNodeDef;
/** 다중 stage 노드(청크·임베딩)는 같은 문서가 양쪽 큐에 있어 max — 합산 = 이중계산 */
pending: number;
processing: number;
failed: number; // 실패는 행 단위 사실이라 합산
done1h: number;
created1h: number;
doneToday: number;
oldestAgeSec: number | null;
etaMinutes: number | null;
inflowDominant: boolean;
perStage: QueueStageRow[];
}
const stageBy = $derived(new Map(overview.stages.map((s) => [s.stage, s])));
function nodeStats(def: FlowNodeDef): NodeStats {
const rows = def.stages
.map((s) => stageBy.get(s))
.filter((r): r is QueueStageRow => r != null);
const pending = rows.reduce((m, r) => Math.max(m, r.pending), 0);
const done1h = rows.reduce((m, r) => Math.max(m, r.done_1h), 0);
const created1h = rows.reduce((m, r) => Math.max(m, r.created_1h), 0);
const oldest = rows.reduce<number | null>(
(m, r) => (r.oldest_pending_age_sec == null ? m : Math.max(m ?? 0, r.oldest_pending_age_sec)),
null,
);
return {
def,
pending,
processing: rows.reduce((s, r) => s + r.processing, 0),
failed: rows.reduce((s, r) => s + r.failed, 0),
done1h,
created1h,
doneToday: rows.reduce((m, r) => Math.max(m, r.done_today), 0),
oldestAgeSec: oldest,
etaMinutes: pending > 0 && done1h > 0 ? Math.round((pending / done1h) * 60) : null,
inflowDominant: pending > 0 && created1h > done1h,
perStage: rows,
};
}
const mainNodes = $derived(FLOW_NODES.map(nodeStats));
const auxAll = $derived(AUX_NODES.map(nodeStats));
const auxActive = $derived(
auxAll.filter((n) => n.pending + n.processing + n.failed + n.doneToday > 0),
);
const auxIdle = $derived(
auxAll.filter((n) => n.pending + n.processing + n.failed + n.doneToday === 0),
);
const totalFailed = $derived(overview.totals.failed);
// 머신 스트립 — overview.machines 의 state/처리율 + 정적 모델 메타
const machineStrip = $derived(
overview.machines.map((m) => ({
...m,
meta: MACHINE_META[m.key],
})),
);
// ─── 선택 상태 (노드 상세 / 실패 드로어 — 동시에 하나만) ───
let selected = $state<string | null>(null);
let failOpen = $state(false);
function toggleNode(key: string) {
selected = selected === key ? null : key;
if (selected) failOpen = false;
}
const selectedNode = $derived(
[...mainNodes, ...auxAll].find((n) => n.def.key === selected) ?? null,
);
function nodeCurrent(def: FlowNodeDef): MachineCurrentItem[] {
return overview.machines.flatMap((m) => m.current.filter((c) => def.stages.includes(c.stage)));
}
// ─── 실패 드로어 ───
let failItems = $state<FailedItem[]>([]);
let failLoading = $state(false);
let busy = $state(false);
let expanded = $state<Record<string, boolean>>({});
async function openFailures() {
failOpen = true;
selected = null;
await loadFailures();
}
async function loadFailures() {
failLoading = true;
try {
const r = await api<FailedListResponse>('/queue/failed');
failItems = r.items;
} catch {
addToast('error', '실패 목록을 불러오지 못했습니다');
} finally {
failLoading = false;
}
}
interface FailGroup {
key: string;
stage: string;
pattern: string;
items: FailedItem[];
}
// 그룹핑 = stage + 에러 메시지 prefix(36자) — 같은 원인(ReadTimeout 등) 묶음
const failGroups = $derived.by(() => {
const map = new Map<string, FailGroup>();
for (const it of failItems) {
const pattern = (it.error_message ?? '(메시지 없음)').slice(0, 36);
const key = `${it.stage}::${pattern}`;
const g = map.get(key);
if (g) g.items.push(it);
else map.set(key, { key, stage: it.stage, pattern, items: [it] });
}
return [...map.values()].sort(
(a, b) => a.stage.localeCompare(b.stage) || b.items.length - a.items.length,
);
});
async function retryIds(ids: number[]) {
if (busy || ids.length === 0) return;
busy = true;
try {
const r = await api<RetryResponse>('/queue/retry', {
method: 'POST',
body: JSON.stringify({ ids }),
});
addToast(
'success',
`재시도 ${r.retried}건 큐 재진입${r.not_retried > 0 ? ` (${r.not_retried} 제외 이미 활성/처리됨)` : ''}`,
);
await afterAction();
} catch {
addToast('error', '재시도 요청 실패');
} finally {
busy = false;
}
}
async function skipIds(ids: number[]) {
if (busy || ids.length === 0) return;
busy = true;
try {
const r = await api<SkipResponse>('/queue/skip', {
method: 'POST',
body: JSON.stringify({ ids }),
});
addToast('success', `건너뛰기 ${r.skipped}건 처리 (해당 단계 제외)`);
await afterAction();
} catch {
addToast('error', '건너뛰기 요청 실패');
} finally {
busy = false;
}
}
async function afterAction() {
await Promise.all([loadFailures(), refreshQueueOverview()]);
}
// ─── trend_24h 스파크라인 (summarize 유입 vs 소화 — API 가 주는데 미렌더이던 슬롯) ───
const spark = $derived.by(() => {
const t = overview.trend_24h;
if (!t || t.length === 0) return null;
const max = Math.max(1, ...t.map((b) => Math.max(b.inflow, b.done)));
const w = 120;
const h = 24;
const step = w / Math.max(1, t.length - 1);
const pts = (sel: (b: (typeof t)[number]) => number) =>
t.map((b, i) => `${(i * step).toFixed(1)},${(h - (sel(b) / max) * (h - 3) + 1).toFixed(1)}`).join(' ');
return { inflow: pts((b) => b.inflow), done: pts((b) => b.done) };
});
</script>
<div class="mt-5">
<!-- 헤더: 타이틀 + 요약 24h 스파크라인 + 실패 합계 -->
<div class="flex items-center justify-between gap-3 mb-3">
<div class="text-[11px] font-bold text-dim uppercase tracking-wider">처리 머신</div>
<div class="flex items-center gap-3">
{#if totalFailed > 0}
<button
class="text-[11px] font-semibold text-error hover:underline cursor-pointer"
onclick={openFailures}
>실패 {totalFailed}건 처리</button>
{/if}
{#if spark}
<div class="flex items-center gap-2 text-[10px] text-faint tabular-nums" title="요약(summarize) 단계 24시간 — 유입(회색) vs 소화(녹색)">
<svg width="120" height="24" viewBox="0 0 120 24" class="block">
<polyline points={spark.inflow} fill="none" stroke="currentColor" stroke-width="1.5" class="text-faint" />
<polyline points={spark.done} fill="none" stroke="currentColor" stroke-width="1.5" class="text-success" />
</svg>
<span>요약 24h 유입/소화</span>
</div>
{/if}
</div>
</div>
<!-- 머신 스트립 -->
<div class="flex flex-wrap gap-2 mb-3">
{#each machineStrip as m (m.key)}
<div class="flex items-center gap-2 bg-surface border border-default rounded-full px-3.5 py-1.5 text-xs">
<span class="w-2 h-2 rounded-full shrink-0 {m.state === 'active' ? 'bg-success' : m.state === 'deferred' ? 'bg-warning' : 'bg-faint'}"></span>
<span class="font-bold text-text">{m.meta?.label ?? m.label}</span>
<span class="text-[10px] text-faint font-mono">{m.meta?.model}</span>
<span class="text-[11px] text-dim tabular-nums">{formatRate(m.done_1h)}/h</span>
{#if m.key === 'macbook' && m.deferred_pending > 0}
<span class="text-[10px] font-semibold text-warning tabular-nums">보류 {m.deferred_pending}</span>
{/if}
</div>
{/each}
</div>
<!-- 흐름 노드 -->
<div class="flex items-stretch overflow-x-auto pb-1">
{#each mainNodes as n, i (n.def.key)}
{#if i > 0}
<div class="flex items-center text-faint text-sm px-1.5 shrink-0" aria-hidden="true"></div>
{/if}
<div
class="relative bg-surface border-[1.5px] rounded-card px-3 py-2.5 min-w-[124px] shrink-0 text-left transition-colors cursor-pointer hover:bg-surface-hover
{n.inflowDominant ? 'border-warning' : n.etaMinutes != null && n.def.stages.includes('chunk') ? 'border-success' : 'border-default'}
{selected === n.def.key ? 'node-sel' : ''}"
role="button"
tabindex="0"
onclick={() => toggleNode(n.def.key)}
onkeydown={(e) => { if (e.key === 'Enter' || e.key === ' ') { e.preventDefault(); toggleNode(n.def.key); } }}
title="{n.def.label} — 클릭하면 상세"
>
{#if n.failed > 0}
<button
class="absolute -top-2 -right-1.5 text-[9px] font-extrabold bg-error text-white rounded-full px-1.5 py-px shadow cursor-pointer"
onclick={(e) => { e.stopPropagation(); openFailures(); }}
title="실패 {n.failed}건 — 클릭하면 실패 처리"
>{n.failed}</button>
{/if}
<span class="inline-block text-[9px] font-bold rounded px-1.5 py-px mb-1.5 mtag-{n.def.machine}">
{MACHINE_META[n.def.machine].label} · {n.def.engine}
</span>
<div class="text-xs font-bold text-text flex items-center gap-1.5">
{n.def.label}
{#if n.processing > 0}
<span class="inline-block w-1.5 h-1.5 rounded-full bg-accent animate-pulse" title="처리 중 {n.processing}"></span>
{/if}
{#if n.inflowDominant}
<span class="text-[9px] font-bold text-warning">유입 우세</span>
{/if}
</div>
<div class="text-base font-extrabold tabular-nums tracking-tight leading-tight mt-0.5 text-text">
{n.pending.toLocaleString()}
</div>
<div class="text-[10px] text-dim tabular-nums">
{formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}
{#if n.etaMinutes != null && !n.inflowDominant && n.pending > 0}
· <span class="text-accent font-semibold">{etaShort(n.etaMinutes)}</span>
{/if}
</div>
</div>
{/each}
</div>
<!-- 보조 라인 -->
<p class="text-[10px] text-faint mt-1.5 tabular-nums">
{#each auxActive as n, i (n.def.key)}
{i > 0 ? ' · ' : '보조: '}{n.def.label}({n.def.engine}) 대기 {n.pending.toLocaleString()} · {formatRate(n.done1h)}/h{n.failed > 0 ? ` · 실패 ${n.failed}` : ''}
{/each}
{#if auxIdle.length > 0}
{auxActive.length > 0 ? ' — ' : ''}한가: {auxIdle.map((n) => n.def.label).join(' · ')}
{/if}
— 뉴스 등 일부 소스는 분류/추출을 건너뜀 (흐름 그림은 대표 경로)
</p>
<!-- 상세 패널 (노드 클릭) -->
{#if selectedNode}
<div class="border rounded-card mt-3 overflow-hidden bg-surface detail-frame">
<div class="flex items-center gap-2.5 px-4 py-2.5 text-xs font-bold detail-head">
{selectedNode.def.label}{selectedNode.def.engine}
<span class="text-[10px] font-mono font-medium text-dim bg-surface border border-default rounded px-1.5">{selectedNode.def.sub} · {MACHINE_META[selectedNode.def.machine].label}</span>
<button class="ml-auto text-[11px] text-dim font-normal cursor-pointer hover:text-text" onclick={() => (selected = null)}>닫기</button>
</div>
<div class="px-4 pb-3.5">
<div class="grid grid-cols-2 md:grid-cols-4 gap-2.5 my-2.5">
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">대기</div>
<div class="text-lg font-extrabold tabular-nums text-text">{selectedNode.pending.toLocaleString()}</div>
</div>
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">처리율 (1h)</div>
<div class="text-lg font-extrabold tabular-nums text-text">{formatRate(selectedNode.done1h)}<span class="text-[11px] text-dim font-semibold">/h</span></div>
</div>
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">오늘 완료</div>
<div class="text-lg font-extrabold tabular-nums text-text">{selectedNode.doneToday.toLocaleString()}</div>
</div>
<div class="bg-bg border border-default rounded-card px-3 py-2">
<div class="text-[9px] text-faint uppercase tracking-wide">소진 예상</div>
<div class="text-lg font-extrabold tabular-nums {selectedNode.inflowDominant ? 'text-warning' : 'text-accent'}">
{#if selectedNode.inflowDominant}유입 우세{:else if selectedNode.etaMinutes != null}{etaShort(selectedNode.etaMinutes)}{:else if selectedNode.pending === 0}한가{:else}{/if}
</div>
</div>
</div>
{#if selectedNode.perStage.length > 1}
{#each selectedNode.perStage as row (row.stage)}
<div class="flex items-center gap-2.5 py-1.5 border-t border-default text-xs">
<span class="font-semibold text-text min-w-[72px]">{flowStageLabel(row.stage)}</span>
<span class="ml-auto text-dim tabular-nums">
대기 <strong class="text-text">{row.pending.toLocaleString()}</strong>
· {formatRate(row.done_1h)}/h · 오늘 {row.done_today.toLocaleString()}
{#if row.failed > 0}· <span class="text-error font-semibold">실패 {row.failed}</span>{/if}
</span>
</div>
{/each}
{/if}
<div class="text-[11px] text-dim border-t border-dashed border-default mt-2 pt-2 tabular-nums">
{#if selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600}
가장 오래 기다린 항목 {formatAgeSec(selectedNode.oldestAgeSec)}
{/if}
{#each nodeCurrent(selectedNode.def) as c, i (c.document_id + c.stage)}
{i === 0 && !(selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600) ? '' : ' · '}지금: {c.title} ({flowStageLabel(c.stage)})
{/each}
{#if selectedNode.failed > 0}
· <button class="text-error font-semibold cursor-pointer hover:underline" onclick={openFailures}>실패 {selectedNode.failed} 처리</button>
{/if}
</div>
</div>
</div>
{/if}
<!-- 실패 처리 드로어 -->
{#if failOpen}
<div class="border border-error/40 rounded-card mt-3 overflow-hidden bg-surface">
<div class="flex items-center gap-2.5 px-4 py-2.5 bg-error/5 text-xs font-bold text-text">
실패 처리
<span class="text-[10px] font-semibold text-error">영구 실패 {failItems.length}건 — 자동 재시도 3회 소진, 수동 조치 대기</span>
<button class="ml-auto text-[11px] text-dim font-normal cursor-pointer hover:text-text" onclick={() => (failOpen = false)}>닫기</button>
</div>
{#if failLoading}
<p class="text-xs text-dim text-center py-4">불러오는 중…</p>
{:else if failItems.length === 0}
<p class="text-xs text-dim text-center py-4">영구 실패 항목 없음</p>
{:else}
{#each failGroups as g (g.key)}
<div class="px-4 py-2.5 border-t border-default">
<div class="flex items-center gap-2 flex-wrap text-xs font-bold text-text mb-1">
{flowStageLabel(g.stage)} {g.items.length}
<span class="text-[10px] font-mono font-medium text-error bg-error/10 rounded px-1.5 py-px">{g.pattern}{g.items[0]?.error_message && g.items[0].error_message.length > 36 ? '…' : ''}</span>
</div>
{#each expanded[g.key] ? g.items : g.items.slice(0, 4) as it (it.id)}
<div class="flex items-center gap-2.5 py-1 border-t border-dashed border-default/60 text-xs">
<span class="flex-1 min-w-0 truncate text-text" title={it.title}>{it.title}</span>
<span class="text-[10px] font-mono text-faint shrink-0 tabular-nums">시도 {it.attempts}/{it.max_attempts}</span>
<span class="text-[10px] font-mono text-error shrink-0 max-w-[260px] truncate" title={it.error_message ?? ''}>{it.error_message ?? ''}</span>
<button class="text-[10px] font-bold border border-accent text-accent rounded px-2 py-0.5 shrink-0 cursor-pointer hover:bg-accent/10 disabled:opacity-40" disabled={busy} onclick={() => retryIds([it.id])}>재시도</button>
<button class="text-[10px] font-bold border border-default text-faint rounded px-2 py-0.5 shrink-0 cursor-pointer hover:bg-surface-hover disabled:opacity-40" disabled={busy} onclick={() => skipIds([it.id])}>건너뛰기</button>
</div>
{/each}
{#if g.items.length > 4 && !expanded[g.key]}
<button class="text-[10px] text-dim cursor-pointer hover:text-text mt-1" onclick={() => (expanded = { ...expanded, [g.key]: true })}> {g.items.length - 4} 펼치기</button>
{/if}
{#if g.items.length > 1}
<div class="flex gap-2 mt-1.5">
<button class="text-[10px] font-bold border border-accent text-accent rounded px-2.5 py-0.5 cursor-pointer hover:bg-accent/10 disabled:opacity-40" disabled={busy} onclick={() => retryIds(g.items.map((x) => x.id))}>그룹 전체 재시도 ({g.items.length})</button>
<button class="text-[10px] font-bold border border-default text-faint rounded px-2.5 py-0.5 cursor-pointer hover:bg-surface-hover disabled:opacity-40" disabled={busy} onclick={() => skipIds(g.items.map((x) => x.id))}>그룹 전체 건너뛰기</button>
</div>
{/if}
</div>
{/each}
<p class="text-[10px] text-faint px-4 py-2 border-t border-default">
재시도 = 시도 횟수 리셋 후 큐 재진입 (자동 재시도 3회 새로 부여) · 건너뛰기 = 이 단계 완료 처리(후속 단계 연쇄 없음, 감사 마킹) · 같은 오류가 반복되는 항목(빈 텍스트 등)은 건너뛰기 권장
</p>
{/if}
</div>
{/if}
</div>
<style>
/* 머신 색 — 디자인 토큰 외 3색 (gpu 청/macmini 보라/macbook 황) — 이 컴포넌트 한정 */
.mtag-gpu { background: #e7eef6; color: #3b6ea5; }
.mtag-macmini { background: #efe9f7; color: #8a5fbf; }
.mtag-macbook { background: #f7eedd; color: #b07a10; }
.node-sel { outline: 2px solid #3b6ea5; outline-offset: 1px; }
.detail-frame { border-color: #3b6ea5; }
.detail-head { background: #e7eef6; }
</style>
@@ -0,0 +1,106 @@
<script lang="ts">
// 처리 현황 드로어 (안6 라이트) — 전 페이지 상태 스트립 클릭 시 우측에서 열림.
// 머신 미니카드 3 + ETA 한 줄 + 실패 합계 + 홈 링크 축약본. 상세는 홈 보드가 담당.
// 데이터 = queueOverview store 공유 (60s 폴링, 실패 시 null → 안내문으로 degrade).
// 열림 상태는 uiState 단일 drawer slot('queue') — 사이드바 드로어와 동시 오픈 차단.
import { X } from 'lucide-svelte';
import { ui } from '$lib/stores/uiState.svelte';
import { queueOverview } from '$lib/stores/queueOverview';
import {
MACHINE_STATE_LABEL, machineChipClass, machineDotClass, formatRate, etaPhrase,
} from '$lib/utils/queueDisplay';
import IconButton from '$lib/components/ui/IconButton.svelte';
let open = $derived(ui.isDrawerOpen('queue'));
let data = $derived($queueOverview);
function close() {
ui.closeDrawer();
}
// ESC 닫기 — 레이아웃 전역 핸들러(ui.handleEscape)와 중복돼도 무해(멱등).
// modal stack 이 열려 있으면 modal 우선 (전역 우선순위와 동일).
function onWindowKeydown(e: KeyboardEvent) {
if (e.key === 'Escape' && open && ui.modalStack.length === 0) close();
}
</script>
<svelte:window onkeydown={onWindowKeydown} />
{#if open}
<div class="fixed inset-0 z-drawer">
<!-- 스크림 — 클릭 시 닫기 -->
<button
type="button"
onclick={close}
class="absolute inset-0 bg-scrim transition-opacity"
aria-label="드로어 닫기"
></button>
<!-- 패널 — div + role="dialog" (aside 는 interactive role 불가, a11y 경고) -->
<div
role="dialog"
aria-modal="true"
aria-label="처리 현황"
class="absolute right-0 top-0 bottom-0 w-rail max-w-full bg-sidebar shadow-xl overflow-y-auto"
>
<div class="flex items-center justify-between px-4 h-12 border-b border-default">
<span class="text-sm font-bold text-text">처리 현황</span>
<IconButton icon={X} size="sm" aria-label="닫기" onclick={close} />
</div>
<div class="p-4 space-y-3">
{#if data}
<!-- 머신 미니카드 3 -->
{#each data.machines as m (m.key)}
<div class="bg-surface border border-default rounded-lg px-3.5 py-2.5">
<div class="flex items-center justify-between gap-2">
<span class="flex items-center gap-2 text-[13px] font-semibold text-text min-w-0">
<span class="w-2 h-2 rounded-full shrink-0 {machineDotClass(m.state)}"></span>
<span class="truncate">{m.label}</span>
</span>
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(m.state)}">
{MACHINE_STATE_LABEL[m.state]}
</span>
</div>
<div class="text-[11px] text-dim mt-1 tabular-nums">
대기 <strong class="text-text">{m.pending.toLocaleString()}</strong>
· 오늘 <strong class="text-text">{m.done_today.toLocaleString()}</strong>건 처리
</div>
</div>
{/each}
<!-- ETA 한 줄 (안5 라이트 — 추정치) -->
<div
class="text-[11px] text-dim leading-relaxed tabular-nums"
title="현재 페이스 기반 추정치 — 유입 변동 시 달라질 수 있습니다"
>
요약 대기 <strong class="text-text">{data.summarize_eta.pending.toLocaleString()}</strong>
— 소화 {formatRate(data.summarize_eta.done_rate_1h)}/h
· 유입 {formatRate(data.summarize_eta.inflow_rate_1h)}/h
{#if data.summarize_eta.eta_minutes != null}
· <span class="text-accent font-semibold">{etaPhrase(data.summarize_eta.eta_minutes)}</span>
{:else}
· 유입 우세(백필 중)
{/if}
</div>
<!-- 실패 합계 -->
{#if data.totals.failed > 0}
<div class="text-[11px] font-semibold text-error bg-error/10 rounded-md px-2.5 py-1.5 tabular-nums">
실패 {data.totals.failed.toLocaleString()}건 — 확인 필요
</div>
{/if}
{:else}
<p class="text-xs text-dim">처리 현황을 불러오지 못했습니다.</p>
{/if}
<a
href="/"
onclick={close}
class="block text-xs text-accent font-semibold hover:underline pt-1"
>홈에서 자세히 →</a>
</div>
</div>
</div>
{/if}
+63
View File
@@ -0,0 +1,63 @@
// 처리 큐 overview store — GET /api/queue/overview 를 60초 주기로 폴링.
// system.ts 의 dashboardSummary 와 같은 구독 기반 패턴 (첫 subscribe 시 시작).
//
// 의도적으로 api() 헬퍼를 쓰지 않는다 — 폴링 경로의 401 이 refresh 실패 →
// window.location='/login' 강제 logout 부수효과를 일으키면 안 됨 (eid 리뷰
// finding 재발 방지). 백엔드 미배포(404)/401/네트워크 실패 전부 silent 하게
// null 로 수렴하고, 소비자(스트립/보드/드로어)는 null 이면 스스로 숨는다.
import { writable } from 'svelte/store';
import { browser } from '$app/environment';
import { getAccessToken } from '$lib/api';
import type { QueueOverview } from '$lib/types/queue';
const POLL_INTERVAL_MS = 60_000;
let pollHandle: ReturnType<typeof setInterval> | null = null;
let subscriberCount = 0;
let inFlight: Promise<void> | null = null;
const internal = writable<QueueOverview | null>(null, (_set) => {
subscriberCount += 1;
if (subscriberCount === 1 && browser) {
void refreshQueueOverview();
pollHandle = setInterval(() => void refreshQueueOverview(), POLL_INTERVAL_MS);
}
return () => {
subscriberCount -= 1;
if (subscriberCount === 0 && pollHandle) {
clearInterval(pollHandle);
pollHandle = null;
}
};
});
export const queueOverview = { subscribe: internal.subscribe };
/** 경량 fetch — 실패는 전부 null (silent 비차단, 강제 logout 경로 없음) */
async function fetchOverview(): Promise<QueueOverview | null> {
try {
const headers: Record<string, string> = {};
const token = getAccessToken();
if (token) headers['Authorization'] = `Bearer ${token}`;
const res = await fetch('/api/queue/overview', { headers, credentials: 'include' });
if (!res.ok) return null;
return (await res.json()) as QueueOverview;
} catch {
return null;
}
}
/** 수동/추가 폴링용 — 홈은 자체 30s interval 로 이 함수를 호출 (동시 fetch 합치기) */
export async function refreshQueueOverview(): Promise<void> {
if (!browser) return;
if (inFlight) return inFlight;
inFlight = (async () => {
try {
internal.set(await fetchOverview());
} finally {
inFlight = null;
}
})();
return inFlight;
}
+5 -3
View File
@@ -3,7 +3,9 @@
// (toast는 별도 store. drawer가 persistent inline panel(예: xl+ meta rail)일 때는
// 여기 시스템 밖이다 — 그저 레이아웃의 일부.)
type Drawer = { id: 'sidebar' | 'meta' } | null;
// 'queue' = 처리 현황 드로어 (상태 스트립 클릭 시 우측) — 단일 slot 규칙 동일
export type DrawerId = 'sidebar' | 'meta' | 'queue';
type Drawer = { id: DrawerId } | null;
type Modal = { id: string };
class UIState {
@@ -11,14 +13,14 @@ class UIState {
modalStack = $state<Modal[]>([]);
// ── Drawer (단일 slot) ──────────────────────────────
openDrawer(id: 'sidebar' | 'meta') {
openDrawer(id: DrawerId) {
// 새 drawer 열면 이전 drawer는 자동으로 사라진다 (단일 slot)
this.drawer = { id };
}
closeDrawer() {
this.drawer = null;
}
isDrawerOpen(id: 'sidebar' | 'meta') {
isDrawerOpen(id: DrawerId) {
return this.drawer?.id === id;
}
+108
View File
@@ -0,0 +1,108 @@
/**
* GET /api/queue/overview .
*
* Backend (feat/ds-processing-board).
* .
*/
export type MachineKey = 'gpu' | 'macmini' | 'macbook';
/** 머신 상태 — active(가동) / deferred(보류) / idle(대기) */
export type MachineState = 'active' | 'deferred' | 'idle';
/** 머신이 지금 처리 중인 문서 1건 */
export interface MachineCurrentItem {
document_id: number;
title: string;
stage: string;
}
export interface MachineOverview {
key: MachineKey;
label: string;
state: MachineState;
/** 담당 단계 키 목록 (extract/classify/... — 홈 STAGE_LABEL 로 한글화) */
stages: string[];
pending: number;
processing: number;
failed: number;
/** 최근 1시간 완료 건수 (처리율 N/h 표기) */
done_1h: number;
done_today: number;
/** 보류 건수 — 맥북 sleep 등으로 자동 재개 대기 중 */
deferred_pending: number;
current: MachineCurrentItem[];
}
/** 요약 백로그 ETA (안5 라이트) — 추정치, 유입 변동 시 오차 */
export interface SummarizeEta {
pending: number;
done_rate_1h: number;
inflow_rate_1h: number;
/** null = 유입이 소화를 앞섬 (백필 중) — 소진 예상 불가 */
eta_minutes: number | null;
}
/** 시간당 유입 vs 소화 (이번 트랙 미렌더 — 후속 추세 위젯 슬롯) */
export interface TrendPoint {
hour: string;
inflow: number;
done: number;
}
export interface QueueTotals {
pending: number;
processing: number;
failed: number;
}
export interface QueueStageRow {
stage: string;
pending: number;
processing: number;
failed: number;
/** 최근 1시간 완료 — 노드 처리율·ETA 재료 (ds-board-engines-1) */
done_1h: number;
/** 최근 1시간 유입 — 유입 우세 판정 재료 (ds-board-engines-1) */
created_1h: number;
done_today: number;
oldest_pending_age_sec: number | null;
}
export interface QueueOverview {
machines: MachineOverview[];
summarize_eta: SummarizeEta;
trend_24h: TrendPoint[];
stages: QueueStageRow[];
totals: QueueTotals;
}
/** ─── 실패 처리 (ds-board-engines-1) — GET /api/queue/failed · POST /retry|/skip ─── */
export interface FailedItem {
id: number;
stage: string;
document_id: number;
title: string;
attempts: number;
max_attempts: number;
error_message: string | null;
failed_at: string | null;
}
export interface FailedListResponse {
items: FailedItem[];
total: number;
}
export interface RetryResponse {
requested: number;
retried: number;
not_retried: number;
}
export interface SkipResponse {
requested: number;
skipped: number;
not_skipped: number;
}
+117
View File
@@ -0,0 +1,117 @@
// 처리 머신 보드 / 상태 스트립 / 드로어 공용 표시 헬퍼.
// 상태 표현은 dot + 칩 (이모지 금지 원칙) — 토큰 클래스만 사용.
import type { MachineState } from '$lib/types/queue';
/** 머신 상태 한글 라벨 */
export const MACHINE_STATE_LABEL: Record<MachineState, string> = {
active: '가동',
deferred: '보류',
idle: '대기',
};
/** 상태 dot 색 — 가동=success / 보류=warning / 대기=faint */
export function machineDotClass(state: MachineState): string {
if (state === 'active') return 'bg-success';
if (state === 'deferred') return 'bg-warning';
return 'bg-faint';
}
/** 상태 칩 톤 — 가동=accent / 보류=warn / 대기=dim */
export function machineChipClass(state: MachineState): string {
if (state === 'active') return 'bg-accent/10 text-accent';
if (state === 'deferred') return 'bg-warning/10 text-warning';
return 'bg-surface-hover text-faint';
}
/** 처리율 표기 — 정수는 그대로, 소수는 한 자리 */
export function formatRate(n: number): string {
return Number.isInteger(n) ? n.toLocaleString() : n.toFixed(1);
}
/** ETA 분 → "약 N분/N시간 후 소진 예상" (추정치 — title 로 명시는 호출부 책임) */
export function etaPhrase(minutes: number): string {
if (minutes < 60) return `${Math.max(1, Math.round(minutes))}분 후 소진 예상`;
const hours = minutes / 60;
const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
return `${text}시간 후 소진 예상`;
}
/** ETA 분 → 칩용 짧은 표기 ("약 4.6시간" / "약 12분") */
export function etaShort(minutes: number): string {
if (minutes < 60) return `${Math.max(1, Math.round(minutes))}`;
const hours = minutes / 60;
const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
return `${text}시간`;
}
/** 경과 초 → "N분 전 / N시간 전 / N일 전" */
export function formatAgeSec(sec: number): string {
if (sec < 3600) return `${Math.max(1, Math.round(sec / 60))}분 전`;
if (sec < 86400) return `${Math.round(sec / 3600)}시간 전`;
return `${Math.round(sec / 86400)}일 전`;
}
/* (plan ds-board-engines-1)
* stage / () / . API label
* (raw ), · .
* / 1 (: 맥미니 ).
*/
export type FlowMachine = 'gpu' | 'macmini' | 'macbook';
export interface FlowNodeDef {
key: string;
/** 노드 표시명 */
label: string;
/** 합산할 stage 키 (다중 = 같은 엔진 공유) */
stages: string[];
machine: FlowMachine;
/** 엔진/모델 표시명 (FE 정적 — 모델 교체 시 여기 수정) */
engine: string;
/** 보조 표기 (서비스/워커명) */
sub: string;
}
/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
export const FLOW_NODES: FlowNodeDef[] = [
{ key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' },
{ key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' },
{ key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
{ key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
{ key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' },
{ key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
];
/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
export const AUX_NODES: FlowNodeDef[] = [
{ key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' },
{ key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' },
{ key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' },
];
/** 머신 스트립 메타 — 모델 표기 단일 지점 */
export const MACHINE_META: Record<FlowMachine, { label: string; model: string }> = {
gpu: { label: 'GPU 서버', model: '특화 엔진' },
macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' },
macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
};
/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
export const FLOW_STAGE_LABEL: Record<string, string> = {
extract: '추출',
classify: '분류',
summarize: '요약',
embed: '임베딩',
chunk: '청크',
preview: '미리보기',
stt: '전사',
thumbnail: '썸네일',
deep_summary: '심층분석',
markdown: '마크다운',
fulltext: '전문',
};
export function flowStageLabel(stage: string): string {
return FLOW_STAGE_LABEL[stage] ?? stage;
}
+37
View File
@@ -8,8 +8,11 @@
import { toasts, removeToast } from '$lib/stores/toast';
import { refresh as refreshPublicConfig } from '$lib/stores/config';
import { ui } from '$lib/stores/uiState.svelte';
import { queueOverview } from '$lib/stores/queueOverview';
import { MACHINE_STATE_LABEL, machineChipClass } from '$lib/utils/queueDisplay';
import Sidebar from '$lib/components/Sidebar.svelte';
import SystemStatusDot from '$lib/components/SystemStatusDot.svelte';
import QueueDrawer from '$lib/components/QueueDrawer.svelte';
import QuickMemoButton from '$lib/components/QuickMemoButton.svelte';
import IconButton from '$lib/components/ui/IconButton.svelte';
import Drawer from '$lib/components/ui/Drawer.svelte';
@@ -65,6 +68,15 @@
let showChrome = $derived($isAuthenticated && !NO_CHROME_PATHS.some(p => $page.url.pathname.startsWith(p)));
let showSidebar = $derived(showChrome && !NO_SIDEBAR_PATHS.some(p => $page.url.pathname.startsWith(p)));
// 처리 현황 스트립 (안6 라이트) — 60s 폴링 store 공유. fetch 실패/401 시
// store 가 null → 스트립 자체를 숨김 (silent 비차단, 로그인 페이지 동일).
let queue = $derived($queueOverview);
let queueMacbook = $derived(queue?.machines?.find((m) => m.key === 'macbook') ?? null);
function toggleQueueDrawer() {
if (ui.isDrawerOpen('queue')) ui.closeDrawer();
else ui.openDrawer('queue');
}
function handleKeydown(e) {
if (e.key === '/' && !['INPUT', 'TEXTAREA'].includes(document.activeElement?.tagName)) {
e.preventDefault();
@@ -162,6 +174,28 @@
</div>
</nav>
<!-- 전 페이지 상태 스트립 (안6 라이트) — 클릭 시 우측 처리 현황 드로어 토글 -->
{#if queue}
<button
type="button"
onclick={toggleQueueDrawer}
aria-expanded={ui.isDrawerOpen('queue')}
aria-label="처리 현황 자세히 보기"
class="flex items-center gap-3 px-4 py-1.5 border-b border-default bg-surface text-[11px] text-dim shrink-0 text-left hover:bg-surface-hover transition-colors overflow-x-auto"
>
<span class="flex items-center gap-1.5 shrink-0">
<span class="w-2 h-2 rounded-full {queue.totals.processing > 0 ? 'bg-success' : 'bg-faint'}"></span>
<strong class="text-text font-semibold tabular-nums">처리 중 {queue.totals.processing.toLocaleString()}</strong>
</span>
<span class="tabular-nums shrink-0">대기 <strong class="text-text">{queue.totals.pending.toLocaleString()}</strong></span>
<span class="tabular-nums shrink-0 {queue.totals.failed > 0 ? 'text-error font-semibold' : ''}">실패 <strong class={queue.totals.failed > 0 ? '' : 'text-text'}>{queue.totals.failed.toLocaleString()}</strong></span>
{#if queueMacbook}
<span class="text-[10px] font-bold rounded-full px-2 py-0.5 shrink-0 {machineChipClass(queueMacbook.state)}">맥북 {MACHINE_STATE_LABEL[queueMacbook.state]}</span>
{/if}
<span class="ml-auto flex items-center gap-0.5 text-faint shrink-0">자세히 <ChevronDown size={11} /></span>
</button>
{/if}
<!-- 메인: 데스크탑 상시 사이드바 + 콘텐츠 -->
<div class="flex-1 min-h-0 flex">
{#if showSidebar}
@@ -191,6 +225,9 @@
</Drawer>
</div>
<!-- 처리 현황 드로어 (안6 라이트, 스트립 클릭 시 우측) -->
<QueueDrawer />
<!-- 빠른 메모 FAB -->
<QuickMemoButton />
</div>
+98 -32
View File
@@ -13,6 +13,9 @@
import { domainBgClass, domainLabel } from '$lib/utils/domainSlug';
import { user } from '$lib/stores/auth';
import { api } from '$lib/api';
import { queueOverview, refreshQueueOverview } from '$lib/stores/queueOverview';
import ProcessingFlowBoard from '$lib/components/ProcessingFlowBoard.svelte';
import type { QueueOverview } from '$lib/types/queue';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import Skeleton from '$lib/components/ui/Skeleton.svelte';
import {
@@ -125,6 +128,28 @@
preview: '미리보기', thumbnail: '썸네일',
};
// ─── 처리 머신 보드 (안2) + ETA (안5 라이트) — GET /api/queue/overview ───
// 홈은 30s 폴링 (store 기본 60s 위에 추가 — inFlight 합치기로 중복 호출 0).
// 백엔드 미배포/실패 시 store=null → 보드 자체가 조용히 생략 (silent 비차단).
let queue = $derived<QueueOverview | null>($queueOverview);
// 머신 담당 단계 라벨 — STAGE_LABEL 재사용 + overview 전용 단계 보강
// (backend services/queue_overview.py _STAGE_ORDER 와 동기), 미지 키는 raw
const QUEUE_STAGE_LABEL: Record<string, string> = {
...STAGE_LABEL,
summarize: '요약', chunk: '청크', markdown: '마크다운',
fulltext: '전문', deep_summary: '심층분석',
};
function queueStageLabel(stage: string): string {
return QUEUE_STAGE_LABEL[stage] ?? stage;
}
onMount(() => {
void refreshQueueOverview();
const handle = setInterval(() => void refreshQueueOverview(), 30_000);
return () => clearInterval(handle);
});
interface PipelineRow {
stage: string; label: string;
pending: number; processing: number; failed: number; total: number;
@@ -172,7 +197,20 @@
let totalProcessing = $derived(pipelineRows.reduce((s, r) => s + r.processing, 0));
let pipelineManualClosed = $state(false);
let pipelineOpen = $derived(pipelineManualClosed ? false : totalFailed > 0);
let pipelineOpen = $derived(
pipelineManualClosed ? false : (queue?.totals.failed ?? totalFailed) > 0
);
// 단계별 현황 (2026-06-11 피드백 재설계: 완료가 보여야 한다 — overview.stages 단일 소스)
// active = 오늘 움직임이 있는 단계만, idle = 전부 0 인 단계는 한 줄로 숨김.
let stageRows = $derived(queue?.stages ?? []);
let activeStageRows = $derived(
stageRows.filter((r) => r.pending + r.processing + r.failed + r.done_today > 0)
);
let idleStageRows = $derived(
stageRows.filter((r) => r.pending + r.processing + r.failed + r.done_today === 0)
);
let stageDoneToday = $derived(stageRows.reduce((s, r) => s + r.done_today, 0));
function formatAge(sec: number | null): string {
if (sec == null || sec <= 0) return '';
@@ -420,7 +458,12 @@
</div>
</div>
<!-- ═══ 파이프라인 상세 (실패 있을 때 자동 펼침) ═══ -->
<!-- ═══ 처리 머신 보드 v2 — 파이프라인 흐름 + 상세 패널 + 실패 드로어 (ds-board-engines-1) ═══ -->
{#if queue}
<ProcessingFlowBoard overview={queue} />
{/if}
<!-- ═══ 단계 상세 (기존 stage 테이블 — 접힘 강등, 실패 있을 때 자동 펼침) ═══ -->
<details
class="mt-5"
open={pipelineOpen}
@@ -429,44 +472,67 @@
<summary class="flex items-center justify-between px-5 py-3.5 bg-surface border border-default rounded-card cursor-pointer hover:bg-surface-hover transition-colors select-none list-none">
<span class="text-sm font-semibold text-text flex items-center gap-2">
<ChevronRight size={14} class="transition-transform details-chevron" />
파이프라인 상세
단계별 현황
</span>
<span class="text-xs text-dim flex items-center gap-2.5">
{#if totalFailed > 0}<span class="text-error font-medium">실패 {totalFailed}</span>{/if}
{#if totalPending > 0}<span>대기 {totalPending}</span>{/if}
{#if totalFailed === 0 && totalPending === 0}<span>처리 완료</span>{/if}
{#if queue}
{#if stageDoneToday > 0}<span class="text-success">오늘 {stageDoneToday.toLocaleString()} 완료</span>{/if}
{#if queue.totals.failed > 0}<span class="text-error font-medium">실패 {queue.totals.failed}</span>{/if}
{#if queue.totals.pending > 0}<span>대기 {queue.totals.pending.toLocaleString()}</span>{/if}
{#if stageDoneToday === 0 && queue.totals.failed === 0 && queue.totals.pending === 0}<span>모든 단계 한가함</span>{/if}
{:else}
{#if totalFailed > 0}<span class="text-error font-medium">실패 {totalFailed}</span>{/if}
{#if totalPending > 0}<span>대기 {totalPending}</span>{/if}
{/if}
</span>
</summary>
<div class="mt-2 px-5 py-4 bg-surface border border-default rounded-card">
<p class="text-xs text-dim mb-3">최근 24시간</p>
{#if pipelineRows.length > 0}
<div class="space-y-3">
{#each pipelineRows as row (row.stage)}
<div>
<div class="flex items-center justify-between text-xs mb-1.5">
<span class="text-dim">
{row.label}
{#if row.oldestPendingAgeSec && row.oldestPendingAgeSec > 600}
<span class="ml-1 text-warning" title="가장 오래된 pending 의 경과 시간">({formatAge(row.oldestPendingAgeSec)})</span>
{/if}
</span>
<span class="text-dim tabular-nums">
대기 <span class="text-text">{row.pending}</span> ·
처리 <span class="text-text">{row.processing}</span> ·
실패 <span class={row.failed > 0 ? 'text-error font-medium' : ''}>{row.failed}</span>
</span>
{#if queue}
{#if activeStageRows.length > 0}
<div class="space-y-3.5">
{#each activeStageRows as row (row.stage)}
{@const total = row.done_today + row.pending + row.processing}
{@const donePct = total > 0 ? (row.done_today / total) * 100 : 0}
{@const procPct = total > 0 ? (row.processing / total) * 100 : 0}
<div>
<div class="flex items-baseline justify-between text-xs mb-1.5 gap-2">
<span class="font-medium text-text flex items-center gap-1.5 whitespace-nowrap">
{queueStageLabel(row.stage)}
{#if row.processing > 0}
<span class="inline-block w-1.5 h-1.5 rounded-full bg-accent animate-pulse"></span>
<span class="text-accent font-normal">처리 중 {row.processing}</span>
{/if}
</span>
<span class="text-dim tabular-nums flex items-center gap-2.5 whitespace-nowrap">
{#if row.done_today > 0}<span class="text-success">오늘 {row.done_today.toLocaleString()} 완료</span>{/if}
{#if row.pending > 0}<span>대기 {row.pending.toLocaleString()}</span>{/if}
{#if row.failed > 0}<span class="text-error font-medium">실패 {row.failed}</span>{/if}
</span>
</div>
<!-- 게이지 = 이 단계의 오늘 진척 (완료 / 완료+대기) — 가득 찬 초록 = 다 끝남 -->
<div class="flex h-1.5 w-full overflow-hidden rounded-sm bg-bg" title="오늘 완료 {row.done_today.toLocaleString()} / 잔여 {row.pending.toLocaleString()}">
{#if donePct > 0}<div class="bg-success/70 h-full" style="width: {donePct}%"></div>{/if}
{#if procPct > 0}<div class="bg-accent h-full" style="width: {Math.max(procPct, 1)}%"></div>{/if}
</div>
{#if row.pending > 0 && row.oldest_pending_age_sec && row.oldest_pending_age_sec > 600}
<p class="text-[10px] mt-1 tabular-nums {row.oldest_pending_age_sec > 21600 ? 'text-warning' : 'text-faint'}">
가장 오래 기다린 항목 {formatAge(row.oldest_pending_age_sec)}
</p>
{/if}
</div>
<div class="flex h-1.5 w-full overflow-hidden rounded-sm bg-bg">
{#if row.pending > 0}<div class="bg-warning h-full" style="width: {(row.pending / pipelineMax) * 100}%"></div>{/if}
{#if row.processing > 0}<div class="bg-accent h-full" style="width: {(row.processing / pipelineMax) * 100}%"></div>{/if}
{#if row.failed > 0}<div class="bg-error h-full" style="width: {(row.failed / pipelineMax) * 100}%"></div>{/if}
</div>
</div>
{/each}
</div>
{/each}
</div>
{:else}
<p class="text-xs text-dim text-center py-3">대기·처리·실패 없음 — 모든 단계가 한가합니다</p>
{/if}
{#if idleStageRows.length > 0}
<p class="text-[11px] text-faint mt-4 pt-3 border-t border-default">
비어 있음: {idleStageRows.map((r) => queueStageLabel(r.stage)).join(' · ')}
</p>
{/if}
{:else}
<p class="text-xs text-dim text-center py-3">처리 작업 없음</p>
<p class="text-xs text-dim text-center py-3">현황을 불러오지 못했습니다</p>
{/if}
</div>
</details>
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen06 (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen06 (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen4 (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(2560) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4 (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(2560) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+8
View File
@@ -0,0 +1,8 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 docs 섀도 테이블 (eval 전용, 단일 statement).
-- 평가 = exact scan 이라 벡터 인덱스 없음 (인덱스 전략 = C-1 컷오버 소관).
CREATE TABLE IF NOT EXISTS documents_cand_qwen4m (
doc_id BIGINT PRIMARY KEY,
embed_input_hash TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,10 @@
-- Phase 2A (embedding-phase2a-1 E-1): 후보 임베딩 chunks 섀도 테이블 (eval 전용, 단일 statement).
CREATE TABLE IF NOT EXISTS document_chunks_cand_qwen4m (
id BIGINT PRIMARY KEY,
doc_id BIGINT NOT NULL,
chunk_index INTEGER,
section_title TEXT,
text TEXT,
embedding vector(1024) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
+2 -2
View File
@@ -126,11 +126,11 @@ async def test_deep_conversational_no_sources(client, monkeypatch):
@pytest.mark.asyncio
async def test_deep_probe_fail_503(client, monkeypatch):
"""probe 실패(router 미도달) → 첫 바이트 전 503 macbook_unavailable."""
"""probe 실패(router 미도달) → 첫 바이트 전 503 router_unreachable."""
monkeypatch.setattr(eid_chat, "_probe_router_reachable", _async_false)
r = await client.post("/api/eid/chat", json=_DEEP)
assert r.status_code == 503
assert r.json()["error_reason"] == "macbook_unavailable"
assert r.json()["error_reason"] == "router_unreachable"
@pytest.mark.asyncio
+5 -5
View File
@@ -104,7 +104,7 @@ async def test_anthropic_router_url_blocked(monkeypatch):
@pytest.mark.asyncio
async def test_deep_mode_alias_and_sse_line_rewrite(monkeypatch):
"""deep → qwen-macbook alias, system 은 messages[0] 단일 주입, 라인 단위 정화 중계."""
"""deep → mac-mini-default alias (맥북 백지화 2026-06-11), system 은 messages[0] 단일 주입, 라인 단위 정화 중계."""
seen: dict = {}
def handler(request: httpx.Request) -> httpx.Response:
@@ -139,7 +139,7 @@ async def test_deep_mode_alias_and_sse_line_rewrite(monkeypatch):
]
assert seen["url"].endswith("/v1/chat/completions")
body = seen["json"]
assert body["model"] == "qwen-macbook"
assert body["model"] == "mac-mini-default"
assert body["stream"] is True
assert body["max_tokens"] == 2048
assert body["temperature"] == 0.4
@@ -202,7 +202,7 @@ async def test_prestream_503_maps_reason(monkeypatch):
with pytest.raises(BackendUnavailable) as ei:
await anext(stream)
assert ei.value.reason == "macbook_unavailable"
assert ei.value.backend_name == "qwen-macbook"
assert ei.value.backend_name == "mac-mini-default"
finally:
await c.close()
@@ -253,7 +253,7 @@ async def test_prestream_400_raises_valueerror_failloud(monkeypatch):
c = EidAIClient()
try:
stream = c.call_stream("deep", _MSG, "sys")
with pytest.raises(ValueError, match="router rejected alias='qwen-macbook'"):
with pytest.raises(ValueError, match="router rejected alias='mac-mini-default'"):
await anext(stream)
finally:
await c.close()
@@ -290,7 +290,7 @@ async def test_stream_deadline_exceeded(monkeypatch):
async for _ in stream:
pass
assert ei.value.reason == "stream_deadline_exceeded"
assert ei.value.backend_name == "qwen-macbook"
assert ei.value.backend_name == "mac-mini-default"
finally:
await c.close()
+26
View File
@@ -0,0 +1,26 @@
{
"_meta": {
"plan": "embedding-phase2a-1 G-1",
"measured_at": "2026-06-12",
"serving": "Ollama 0.20.0 (GPU container `ollama`), endpoint = POST /api/embed (단일 고정 — legacy /api/embeddings 사용 금지)",
"invariant": "저장=조회 동일 모델+버전, 프롬프트는 역할별 고정 (문서=plain / 쿼리=instruct prefix)"
},
"instruct_prefix_pinned": "Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: ",
"models": {
"qwen3-embedding:0.6b": {
"digest": "ac6da0dfba84", "size": "639MB", "dim": 1024, "l2_normalized": true
},
"qwen3-embedding:4b": {
"digest": "df5bd2e3c74c", "size": "2.5GB(Q4)", "dim": 2560, "l2_normalized": true,
"mrl_dimensions_option": {"supported": true, "dimensions=1024": {"dim": 1024, "l2_norm": 1.0, "note": "Ollama 가 truncate+재정규화까지 수행 — 쿼리측 MRL 은 dimensions 옵션으로 처리"}}
}
},
"asymmetric_prefix_effect_0.6b": {
"doc": "압력용기의 수압시험은 설계압력의 1.3배로 실시하며, 시험 중 용접부 누설 여부를 육안으로 확인한다.",
"query": "압력용기 수압시험 기준 압력은?",
"cos_doc_vs_query_plain": 0.7446,
"cos_doc_vs_query_instruct": 0.7606,
"cos_plain_vs_instruct_query": 0.882,
"verdict": "prefix 가 쿼리 임베딩을 실질 변화시키고(0.882) 관련쌍 유사도를 올림(+0.016) — 비대칭 사용 필수"
}
}
+32
View File
@@ -0,0 +1,32 @@
{
"id": "chatcmpl-80cd8ddc-7788-4605-b40e-3975fe7e1326",
"object": "chat.completion",
"created": 1781149952,
"model": "/Users/hyungi/mlx-models/Qwen3.6-27B-8bit",
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {
"role": "assistant",
"content": "\uc81c\uacf5\ub41c \ubb38\uc11c\ub294 \uc555\ub825\uc6a9\uae30 \uac80\uc0ac\uc758 \uae30\uc900\uc774 \ub418\ub294 \uaddc\uc815\uc744 \uba85\uc2dc\ud558\uace0 \uc788\uc2b5\ub2c8\ub2e4. \ud575\uc2ec \ub0b4\uc6a9\uc740 \uc555\ub825\uc6a9\uae30\uc5d0 \ub300\ud55c \ubaa8\ub4e0 \uac80\uc0ac \uc808\ucc28\uc640 \uae30\uc900\uc774 'ASME Section VIII Div 1'\uc774\ub77c\ub294 \uad6d\uc81c\uc801\uc73c\ub85c \uc778\uc815\ubc1b\ub294 \uc555\ub825\uc6a9\uae30 \uc124\uacc4 \ubc0f \uc81c\uc791 \uaddc\uc815\uc5d0 \ub530\ub77c \uc5c4\uaca9\ud558\uac8c \uc218\ud589\ub418\uc5b4\uc57c \ud55c\ub2e4\ub294 \uac83\uc785\ub2c8\ub2e4. \uc774\ub294 \uc548\uc804\uc131\uacfc \uc2e0\ub8b0\uc131\uc744 \ubcf4\uc7a5\ud558\uae30 \uc704\ud55c \ud544\uc218\uc801\uc778 \uc694\uad6c\uc0ac\ud56d\uc73c\ub85c, \ud574\ub2f9 \uaddc\uc815\uc744 \uc900\uc218\ud568\uc73c\ub85c\uc368 \uc555\ub825\uc6a9\uae30\uc758 \uad6c\uc870\uc801 \ubb34\uacb0\uc131\uacfc \uc6b4\uc601 \uc548\uc804\uc131\uc744 \ud655\ubcf4\ud560 \uc218 \uc788\uc2b5\ub2c8\ub2e4. \ub530\ub77c\uc11c \uad00\ub828 \uc5c5\ubb34 \uc218\ud589 \uc2dc \ubc18\ub4dc\uc2dc \uc774 \uaddc\uc815\uc744 \ucc38\uc870\ud558\uc5ec \uac80\uc0ac\ub97c \uc9c4\ud589\ud574\uc57c \ud569\ub2c8\ub2e4.",
"reasoning": null,
"tool_calls": null,
"tool_call_id": null,
"name": null
},
"logprobs": null
}
],
"usage": {
"prompt_tokens": 44,
"completion_tokens": 118,
"total_tokens": 162,
"prompt_tokens_details": {
"cached_tokens": 0
},
"prompt_tps": 0.0,
"generation_tps": 0.0,
"peak_memory": 29.804702642
}
}
+97
View File
@@ -0,0 +1,97 @@
"""Phase 2A E-4 비교기 — baseline vs 후보 run CSV 들의 per-query 판정.
python tests/search_eval/compare_runs.py \
--baseline baselines/<exact 재측정>.csv \
--cand qwen06=<...>.csv --cand qwen4=<...>.csv --cand qwen4m=<...>.csv \
[--epsilon 0.01] [--bootstrap 2000]
판정 출력(plan r3 E-4): 전체 graded NDCG 평균 delta · per-query win/loss/tie(|d|<ε=tie)
· 부트스트랩 95% CI · 카테고리별 평균 · 상위 개선/퇴행 쿼리. failure_expected/error 제외.
"""
from __future__ import annotations
import argparse
import csv
import random
import statistics
from pathlib import Path
def load(path: str) -> dict[str, dict]:
out = {}
with Path(path).open(encoding="utf-8") as f:
for row in csv.DictReader(f):
if row.get("failure_expected", "").lower() in ("true", "1"):
continue
if row.get("error"):
continue
try:
row["_g"] = float(row["graded_ndcg_at_10"])
except (TypeError, ValueError):
continue
out[row["id"]] = row
return out
def bootstrap_ci(deltas: list[float], n: int, seed: int = 42) -> tuple[float, float]:
rng = random.Random(seed)
means = sorted(
statistics.mean(rng.choices(deltas, k=len(deltas))) for _ in range(n)
)
return means[int(0.025 * n)], means[int(0.975 * n)]
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--baseline", required=True)
p.add_argument("--cand", action="append", required=True, metavar="name=csv")
p.add_argument("--epsilon", type=float, default=0.01)
p.add_argument("--bootstrap", type=int, default=2000)
a = p.parse_args()
base = load(a.baseline)
print(f"baseline: {a.baseline} — scored {len(base)}, "
f"graded NDCG mean {statistics.mean(r['_g'] for r in base.values()):.4f}")
for spec in a.cand:
name, path = spec.split("=", 1)
cand = load(path)
ids = sorted(set(base) & set(cand))
if len(ids) != len(base):
print(f"{name}: 교집합 {len(ids)} != baseline {len(base)} — 누락 쿼리 확인")
deltas = [cand[i]["_g"] - base[i]["_g"] for i in ids]
mean_b = statistics.mean(base[i]["_g"] for i in ids)
mean_c = statistics.mean(cand[i]["_g"] for i in ids)
win = sum(1 for d in deltas if d > a.epsilon)
loss = sum(1 for d in deltas if d < -a.epsilon)
tie = len(deltas) - win - loss
lo, hi = bootstrap_ci(deltas, a.bootstrap)
decided = win + loss
win_rate = (win / decided * 100) if decided else 0.0
print(f"\n== {name} ==")
print(f" graded NDCG: {mean_b:.4f}{mean_c:.4f} (delta {mean_c-mean_b:+.4f}, "
f"bootstrap95% [{lo:+.4f}, {hi:+.4f}])")
print(f" per-query: win {win} / loss {loss} / tie {tie} (ε={a.epsilon}) — "
f"win-rate(결정전) {win_rate:.0f}%")
cats: dict[str, list[float]] = {}
for i in ids:
cats.setdefault(base[i].get("category", "?"), []).append(
cand[i]["_g"] - base[i]["_g"]
)
for c in sorted(cats):
ds = cats[c]
cb = statistics.mean(base[i]["_g"] for i in ids if base[i].get("category") == c)
cc = statistics.mean(cand[i]["_g"] for i in ids if base[i].get("category") == c)
print(f" {c:<18} {cb:.3f}{cc:.3f} ({statistics.mean(ds):+.3f}, n={len(ds)})")
ranked = sorted(ids, key=lambda i: cand[i]["_g"] - base[i]["_g"])
worst = [(i, round(cand[i]['_g']-base[i]['_g'],3)) for i in ranked[:3]]
best = [(i, round(cand[i]['_g']-base[i]['_g'],3)) for i in ranked[-3:][::-1]]
print(f" 개선 top3 {best} / 퇴행 top3 {worst}")
if __name__ == "__main__":
main()
+150
View File
@@ -0,0 +1,150 @@
"""2026-06-12 fair-share 번들 — gate capacity 일반화 / call_deep_or_defer cfg / drain classify.
worker-process 레벨(DB 필요) deep 폴백·classify drain 라이브 E2E 검증하고,
여기서는 메커니즘의 seam 단위 검증한다.
"""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
import httpx
import pytest
from core.config import settings
from services.search.llm_gate import Priority, _reset_for_test, acquire_mlx_gate, gate_status
@pytest.fixture(autouse=True)
def _reset_gate(monkeypatch):
monkeypatch.setattr(settings, "mlx_gate_concurrency", 2)
_reset_for_test()
yield
_reset_for_test()
# ─── gate capacity 2 ─────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_two_concurrent_holders_overlap():
"""capacity=2: 두 holder 가 동시에 inflight — 서로를 기다리지 않는다."""
log: list = []
async def hold(label: str):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("in", label))
await asyncio.sleep(0.05)
log.append(("out", label))
await asyncio.gather(hold("a"), hold("b"))
# 둘 다 진입한 뒤에 첫 release 가 나와야 함 (overlap 증명)
assert log[0][0] == "in" and log[1][0] == "in"
@pytest.mark.asyncio
async def test_third_waits_until_slot_frees():
"""capacity=2: 3번째는 대기, 첫 release 후 진입."""
order: list = []
release_a = asyncio.Event()
async def hold(label: str, wait_event: asyncio.Event | None):
async with acquire_mlx_gate(Priority.BACKGROUND):
order.append(("in", label))
if wait_event:
await wait_event.wait()
else:
await asyncio.sleep(0.01)
order.append(("out", label))
t_a = asyncio.create_task(hold("a", release_a))
t_b = asyncio.create_task(hold("b", release_a))
await asyncio.sleep(0.02) # a, b inflight 진입 대기
assert ("in", "a") in order and ("in", "b") in order
assert gate_status()["inflight"] == 2
t_c = asyncio.create_task(hold("c", None))
await asyncio.sleep(0.02)
assert ("in", "c") not in order # 슬롯 2 점유 중 — c 는 대기
assert gate_status()["waiters"] == 1
release_a.set()
await asyncio.gather(t_a, t_b, t_c)
assert ("in", "c") in order
@pytest.mark.asyncio
async def test_capacity_one_serializes():
"""concurrency=1 이면 기존 직렬화 그대로 (무회귀)."""
from core.config import settings as s
s_backup = s.mlx_gate_concurrency
s.mlx_gate_concurrency = 1
try:
_reset_for_test()
log: list = []
async def hold(label: str):
async with acquire_mlx_gate(Priority.BACKGROUND):
log.append(("in", label))
await asyncio.sleep(0.02)
log.append(("out", label))
await asyncio.gather(hold("a"), hold("b"))
# 직렬: in/out 쌍이 겹치지 않음
assert [e[0] for e in log] == ["in", "out", "in", "out"]
finally:
s.mlx_gate_concurrency = s_backup
# ─── call_deep_or_defer cfg override ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_call_deep_or_defer_cfg_override():
"""cfg 지정 시 deep 슬롯 대신 해당 config 로 _request 호출."""
from ai.client import call_deep_or_defer
seen: dict = {}
class FakeClient:
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
async def _request(self, cfg, prompt, system=None):
seen["cfg"] = cfg
return "ok"
async def call_deep(self, prompt, system=None):
seen["cfg"] = self.ai.deep
return "ok"
override = SimpleNamespace(model="deep-endpoint-triage-sampling", temperature=0.0)
out = await call_deep_or_defer(FakeClient(), "p", cfg=override)
assert out == "ok"
assert seen["cfg"] is override
@pytest.mark.asyncio
async def test_call_deep_or_defer_cfg_still_defers():
"""cfg 경로에서도 보류 분류(502/503/TransportError → StageDeferred) 동일 적용."""
from ai.client import call_deep_or_defer
from models.queue import StageDeferred
class FakeClient:
ai = SimpleNamespace(deep=SimpleNamespace(model="deep-slot"))
async def _request(self, cfg, prompt, system=None):
raise httpx.ConnectError("down")
with pytest.raises(StageDeferred):
await call_deep_or_defer(FakeClient(), "p", cfg=SimpleNamespace(model="x"))
# ─── drain stages ────────────────────────────────────────────────────────────
def test_drain_stages_include_classify():
from workers.queue_drain import DRAIN_STAGES
assert set(DRAIN_STAGES) == {"summarize", "deep_summary", "classify"}
+160
View File
@@ -0,0 +1,160 @@
"""ds-macbook-offload-1 P2-4 — deep 슬롯 라우팅 / 보류(StageDeferred) / drain 가드 테스트.
DB 불요(unit) AIClient __new__ settings 우회, drain 가드는 settings monkeypatch.
통합(보류 백오프 DB 기록, claim 경합) P3-2 E2E 게이트에서 라이브 실측.
fixture = tests/fixtures/qwen_router_chat_completion.json (2026-06-11 라이브 박제
라우터 :8890 경유 model=qwen-macbook, production 호출 형상과 동일 body, 13.2s 실측).
"""
import json
from pathlib import Path
from types import SimpleNamespace
import httpx
import pytest
from ai.client import AIClient, call_deep_or_defer, is_deferrable_error
from models.queue import StageDeferred
FIXTURE = Path(__file__).parent / "fixtures" / "qwen_router_chat_completion.json"
def _client(deep_cfg, primary_cfg):
"""settings 비의존 AIClient — __init__ 우회 후 ai 슬롯만 주입."""
client = AIClient.__new__(AIClient)
client.ai = SimpleNamespace(deep=deep_cfg, primary=primary_cfg)
return client
def _http_status_error(status: int) -> httpx.HTTPStatusError:
req = httpx.Request("POST", "http://router:8890/v1/chat/completions")
resp = httpx.Response(status, request=req)
return httpx.HTTPStatusError(f"status {status}", request=req, response=resp)
# ─── is_deferrable_error 분류 ──────────────────────────────────────────────
@pytest.mark.parametrize("exc", [
_http_status_error(503), # 라우터 upstream_cold/editor_busy/warming
_http_status_error(502), # 라우터: upstream 연결 실패/생성 중 절단 변환
_http_status_error(504),
httpx.ConnectError("connection refused"), # 라우터 자체 불가
httpx.ConnectTimeout("connect timeout"),
httpx.ReadTimeout("read timeout"), # DS↔라우터 구간 절단
httpx.ReadError("connection reset"),
httpx.RemoteProtocolError("server disconnected"),
])
def test_deferrable_errors(exc):
assert is_deferrable_error(exc) is True
@pytest.mark.parametrize("exc", [
_http_status_error(400), # unknown alias 등 — 설정 오류는 보류 아님
_http_status_error(500),
ValueError("parse"),
RuntimeError("boom"),
])
def test_non_deferrable_errors(exc):
assert is_deferrable_error(exc) is False
# ─── call_deep 슬롯 선택 ───────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_call_deep_uses_deep_slot():
deep = SimpleNamespace(model="qwen-macbook")
primary = SimpleNamespace(model="gemma-26b")
client = _client(deep, primary)
captured = {}
async def fake_request(cfg, prompt, system=None):
captured["cfg"] = cfg
return "ok"
client._request = fake_request
assert await client.call_deep("p") == "ok"
assert captured["cfg"] is deep
@pytest.mark.asyncio
async def test_call_deep_falls_back_to_primary_when_slot_absent():
"""슬롯 부재 = 기능 미활성 (방어적 primary — silent 강등이 아니라 기존 경로 그대로)."""
primary = SimpleNamespace(model="gemma-26b")
client = _client(None, primary)
captured = {}
async def fake_request(cfg, prompt, system=None):
captured["cfg"] = cfg
return "ok"
client._request = fake_request
await client.call_deep("p")
assert captured["cfg"] is primary
# ─── call_deep_or_defer 보류 변환 ──────────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.parametrize("exc", [
_http_status_error(503),
httpx.ConnectError("refused"),
httpx.ReadTimeout("cut mid-generation"),
])
async def test_defer_conversion(exc):
client = _client(SimpleNamespace(model="qwen-macbook"), None)
async def fail_request(cfg, prompt, system=None):
raise exc
client._request = fail_request
with pytest.raises(StageDeferred):
await call_deep_or_defer(client, "p")
@pytest.mark.asyncio
async def test_non_deferrable_propagates():
"""400/일반 오류는 StageDeferred 아님 — 호출자 기존 실패 경로로 전파."""
client = _client(SimpleNamespace(model="qwen-macbook"), None)
async def fail_request(cfg, prompt, system=None):
raise _http_status_error(400)
client._request = fail_request
with pytest.raises(httpx.HTTPStatusError):
await call_deep_or_defer(client, "p")
def test_stage_deferred_carries_backoff():
e = StageDeferred("macbook_unavailable:ConnectError")
assert e.retry_after_minutes == 30
def test_router_fixture_shape():
"""_request 파싱 경로(choices[0].message.content)가 라우터 실응답 형상과 일치하는지 고정."""
data = json.loads(FIXTURE.read_text())
content = data["choices"][0]["message"]["content"]
assert isinstance(content, str) and len(content) > 0
assert data["choices"][0]["message"]["role"] == "assistant"
# 라우터가 alias 를 upstream 로컬 경로로 치환해 응답 — 실처리 모델 추적 가능
assert "Qwen3.6-27B-8bit" in data["model"]
# ─── drain 가드 (silent 강등 금지) ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_drain_requires_deep_slot(monkeypatch):
import workers.queue_drain as qd
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=None)))
with pytest.raises(SystemExit):
await qd.drain("summarize", 1)
@pytest.mark.asyncio
async def test_drain_rejects_non_drain_stage(monkeypatch):
"""classify 는 2026-06-12 fair-share 로 DRAIN_STAGES 합류 — 거부 대상은 extract 등."""
import workers.queue_drain as qd
monkeypatch.setattr(qd, "settings", SimpleNamespace(ai=SimpleNamespace(deep=object())))
with pytest.raises(SystemExit):
await qd.drain("extract", 1)
+96
View File
@@ -0,0 +1,96 @@
"""Phase 2A (embedding-phase2a-1) — Qwen 후보 디스패처/쿼리 임베딩 단위 테스트."""
from __future__ import annotations
import pytest
from services.search import retrieval_service as rs
def test_resolve_qwen_backends():
for slug in ("cand_qwen06", "cand_qwen4", "cand_qwen4m"):
cfg = rs._resolve_backend(slug)
assert cfg["docs_table"].startswith("documents_cand_qwen")
assert cfg["chunks_table"].startswith("document_chunks_cand_qwen")
assert cfg["embed_kind"] == "ollama"
# 테이블명이 2단계 SQL allowlist 도 통과해야 함 (R2-B1)
assert rs._VALID_DOCS_TABLE.match(cfg["docs_table"])
assert rs._VALID_CHUNKS_TABLE.match(cfg["chunks_table"])
assert rs._resolve_backend("baseline") is None
with pytest.raises(ValueError):
rs._resolve_backend("cand_unknown")
def test_qwen4m_has_mrl_dimensions():
assert rs._resolve_backend("cand_qwen4m")["embed_dimensions"] == 1024
assert "embed_dimensions" not in rs._resolve_backend("cand_qwen4")
class _FakeResp:
def __init__(self, embs):
self._embs = embs
def raise_for_status(self):
return None
def json(self):
return {"embeddings": self._embs}
class _FakeClient:
"""httpx.AsyncClient 대역 — post body 캡처."""
captured: dict = {}
def __init__(self, *a, **k):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *a):
return False
async def post(self, url, json=None):
_FakeClient.captured = {"url": url, "json": json}
dim = (json or {}).get("dimensions") or 1024
return _FakeResp([[0.1] * dim])
@pytest.mark.asyncio
async def test_ollama_query_embed_applies_instruct_prefix(monkeypatch):
import httpx
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
cfg = rs._resolve_backend("cand_qwen06")
out = await rs._embed_query_via_ollama(cfg, "압력용기 수압시험")
assert out is not None and len(out) == 1024
body = _FakeClient.captured["json"]
assert body["model"] == "qwen3-embedding:0.6b"
assert body["input"][0].startswith(rs.QWEN3_QUERY_INSTRUCT)
assert body["input"][0].endswith("압력용기 수압시험")
assert "dimensions" not in body
@pytest.mark.asyncio
async def test_ollama_query_embed_mrl_dimensions(monkeypatch):
import httpx
monkeypatch.setattr(httpx, "AsyncClient", _FakeClient)
cfg = rs._resolve_backend("cand_qwen4m")
out = await rs._embed_query_via_ollama(cfg, "q")
assert _FakeClient.captured["json"]["dimensions"] == 1024
assert len(out) == 1024
@pytest.mark.asyncio
async def test_ollama_query_embed_failure_returns_none(monkeypatch):
import httpx
class _Boom(_FakeClient):
async def post(self, url, json=None):
raise httpx.ConnectError("down")
monkeypatch.setattr(httpx, "AsyncClient", _Boom)
cfg = rs._resolve_backend("cand_qwen06")
assert await rs._embed_query_via_ollama(cfg, "q") is None
+202
View File
@@ -0,0 +1,202 @@
"""생성 LLM 홀드 (pipeline.held_stages) — 컨슈머/워커 게이트 동작 테스트.
홀드 시멘틱: held 스테이지는 claim 자체를 하지 않는다 (attempts 미소모, DB 무접촉).
-held 스테이지는 기존과 동일하게 처리된다.
"""
import pytest
from core.config import Settings, settings
from workers import digest_worker, queue_consumer
def _fake_consumer_env(monkeypatch, held):
processed = []
async def fake_process(stage, worker):
processed.append(stage)
async def fake_reset(stages, threshold):
return None
monkeypatch.setattr(queue_consumer, "_process_stage", fake_process)
monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset)
monkeypatch.setattr(
queue_consumer, "_load_workers",
lambda: {
s: object()
for s in (queue_consumer.MAIN_QUEUE_STAGES
+ queue_consumer.FAST_QUEUE_STAGES + ["markdown"])
},
)
monkeypatch.setattr(queue_consumer, "_hold_logged", False)
monkeypatch.setattr(settings, "pipeline_held_stages", held)
return processed
def test_settings_default_empty():
"""미설정 시 빈 리스트 = 무동작 (기존 동작 무회귀)."""
assert Settings().pipeline_held_stages == []
@pytest.mark.asyncio
async def test_consume_queue_skips_held_stages(monkeypatch):
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary"]
)
await queue_consumer.consume_queue()
assert "classify" not in processed
assert "summarize" not in processed
assert "deep_summary" not in processed
# 특화 스테이지는 계속 처리 (embed/chunk 는 2026-06-12 fast 컨슈머로 분리)
for stage in ("extract", "stt", "fulltext"):
assert stage in processed
@pytest.mark.asyncio
async def test_consume_queue_empty_hold_processes_all(monkeypatch):
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_queue()
assert processed == list(queue_consumer.MAIN_QUEUE_STAGES)
@pytest.mark.asyncio
async def test_fast_consumer_processes_embed_chunk_only(monkeypatch):
"""fast 컨슈머(2026-06-12 분리) = embed/chunk 전용, LLM 사이클과 디커플."""
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_fast_queue()
assert processed == ["embed", "chunk"]
@pytest.mark.asyncio
async def test_fast_consumer_respects_hold(monkeypatch):
processed = _fake_consumer_env(monkeypatch, ["embed"])
await queue_consumer.consume_fast_queue()
assert processed == ["chunk"]
def test_fast_split_invariants():
"""세 컨슈머 stage 집합 disjoint + embed/chunk 배치 상향 회귀 가드."""
main = set(queue_consumer.MAIN_QUEUE_STAGES)
fast = set(queue_consumer.FAST_QUEUE_STAGES)
md = set(queue_consumer.MARKDOWN_QUEUE_STAGES)
assert not (main & fast) and not (main & md) and not (fast & md)
assert fast == {"embed", "chunk"}
assert queue_consumer.BATCH_SIZE["embed"] >= 10
assert queue_consumer.BATCH_SIZE["chunk"] >= 10
@pytest.mark.asyncio
async def test_markdown_consumer_not_held(monkeypatch):
"""markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary", "digest"]
)
await queue_consumer.consume_markdown_queue()
assert processed == ["markdown"]
@pytest.mark.asyncio
async def test_digest_worker_held_returns_before_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["digest"])
await digest_worker.run()
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_digest_worker_unheld_runs_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {"clusters": 0}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", [])
await digest_worker.run()
assert called["pipeline"] is True
@pytest.mark.asyncio
async def test_briefing_worker_held_returns_before_pipeline(monkeypatch):
from workers import briefing_worker
called = {"pipeline": False}
async def fake_pipeline(target_date):
called["pipeline"] = True
return {}
monkeypatch.setattr(briefing_worker, "run_briefing_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["briefing"])
assert await briefing_worker.run() is None
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_study_explanation_consumer_held(monkeypatch):
from workers import study_queue_consumer
touched = []
async def fake_reset():
touched.append("reset")
monkeypatch.setattr(study_queue_consumer, "reset_stale_study_jobs", fake_reset)
monkeypatch.setattr(settings, "pipeline_held_stages", ["study_explanation"])
await study_queue_consumer.consume_study_queue()
assert touched == []
@pytest.mark.asyncio
async def test_study_consumers_held_no_db_touch(monkeypatch):
"""held 시 stale reset 포함 DB 접근 0 — claim 미발생 실증."""
from workers import study_memo_card_jobs_consumer, study_session_queue_consumer
touched = []
async def fake_reset_session():
touched.append("session_reset")
async def fake_reset_card():
touched.append("card_reset")
monkeypatch.setattr(
study_session_queue_consumer, "reset_stale_session_jobs", fake_reset_session
)
monkeypatch.setattr(
study_memo_card_jobs_consumer, "reset_stale_card_jobs", fake_reset_card
)
monkeypatch.setattr(
settings, "pipeline_held_stages",
["study_session_analysis", "study_memo_card"],
)
await study_session_queue_consumer.consume_study_session_queue()
await study_memo_card_jobs_consumer.consume_study_memo_card_queue()
assert touched == []
+8 -2
View File
@@ -20,8 +20,14 @@ from services.search.llm_gate import (
@pytest.fixture(autouse=True)
def _reset_gate():
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다)."""
def _reset_gate(monkeypatch):
"""각 테스트 시작 시 gate 상태 reset (fresh event loop 마다).
2026-06-12 capacity 일반화 이후 파일의 직렬화 가정 보존을 위해
concurrency=1 고정 (capacity>1 동작은 test_fair_share.py 커버).
"""
from core.config import settings
monkeypatch.setattr(settings, "mlx_gate_concurrency", 1)
_reset_for_test()
yield
_reset_for_test()
+392
View File
@@ -0,0 +1,392 @@
"""GET /api/queue/overview 판정부 단위테스트 — DB 불요 (plan ds-processing-ui-6an).
services/queue_overview SQL 수집부와 분리된 순수 판정 함수
(stage_machine_map / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes / rows_to_* / display_title)
mock 행으로 검증한다. 통합( SQL) 배포 라이브 smoke 확인.
"""
from datetime import datetime
from zoneinfo import ZoneInfo
from services.queue_overview import (
build_machines,
build_summarize_eta,
build_totals,
build_trend,
compose_overview,
compute_eta_minutes,
display_title,
rows_to_stage_stats,
rows_to_summarize_split,
stage_machine_map,
)
KST = ZoneInfo("Asia/Seoul")
def _stage(**kw) -> dict:
"""stage 통계 1건 — 미지정 필드 0."""
base = {
"pending": 0, "processing": 0, "failed": 0,
"done_1h": 0, "done_today": 0, "done_15m": 0,
"deferred_pending": 0, "created_1h": 0,
}
base.update(kw)
return base
def _split(macbook: dict | None = None, macmini: dict | None = None) -> dict:
"""summarize 풀 완료 실적 split — 미지정 0."""
zero = {"done_1h": 0, "done_today": 0, "done_15m": 0}
return {
"macbook": {**zero, **(macbook or {})},
"macmini": {**zero, **(macmini or {})},
}
def _machine(machines: list[dict], key: str) -> dict:
return next(m for m in machines if m["key"] == key)
# ─── stage→machine 귀속 맵 ────────────────────────────────────────────────────
def test_stage_machine_map_deep_enabled():
smap = stage_machine_map(deep_enabled=True)
for s in ("extract", "embed", "chunk", "markdown", "preview", "thumbnail", "fulltext", "stt"):
assert smap[s] == "gpu"
assert smap["classify"] == "macmini"
assert smap["summarize"] == "macmini"
assert smap["deep_summary"] == "macbook"
def test_stage_machine_map_deep_disabled():
"""deep 슬롯 부재 시 deep_summary 도 macmini 귀속."""
smap = stage_machine_map(deep_enabled=False)
assert smap["deep_summary"] == "macmini"
# ─── 머신 카드 귀속 합산 ──────────────────────────────────────────────────────
def test_gpu_stage_counts_attribution():
stats = {
"extract": _stage(pending=3, processing=1, done_1h=5, done_today=9, done_15m=1),
"stt": _stage(failed=2, done_1h=1, done_today=2),
}
machines = build_machines(stats, _split(), [], deep_enabled=True)
gpu = _machine(machines, "gpu")
assert (gpu["pending"], gpu["processing"], gpu["failed"]) == (3, 1, 2)
assert (gpu["done_1h"], gpu["done_today"]) == (6, 11)
# gpu 의 stages 는 정적 8종 전부 (집계 0 이어도 표시)
assert gpu["stages"] == [
"extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt",
]
def test_summarize_pool_split_attribution():
"""summarize pending/failed = macmini 귀속, 완료 실적은 split 로 분리 —
stage-level summarize done 수치는 카드에 이중 합산되지 않는다."""
stats = {
"classify": _stage(done_1h=2, done_today=3),
"summarize": _stage(pending=7, failed=1, done_1h=10, done_today=20),
}
split = _split(macbook={"done_1h": 4, "done_today": 8}, macmini={"done_1h": 6, "done_today": 12})
machines = build_machines(stats, split, [], deep_enabled=True)
macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook")
assert macmini["pending"] == 7 and macmini["failed"] == 1
assert macmini["done_1h"] == 2 + 6 # classify + macmini 몫 (10 아님)
assert macmini["done_today"] == 3 + 12
assert macbook["done_1h"] == 4 and macbook["done_today"] == 8
assert macbook["pending"] == 0 # 풀 pending 은 macmini 만
def test_deep_disabled_deep_summary_counts_to_macmini():
stats = {"deep_summary": _stage(pending=2, processing=1, done_1h=3, done_today=4)}
machines = build_machines(stats, _split(), [], deep_enabled=False)
macmini = _machine(machines, "macmini")
macbook = _machine(machines, "macbook")
assert macmini["pending"] == 2 and macmini["processing"] == 1
assert macmini["done_1h"] == 3 and macmini["done_today"] == 4
assert macbook["stages"] == [] and macbook["pending"] == 0
assert _machine(machines, "macmini")["stages"] == ["classify", "summarize", "deep_summary"]
def test_deferred_pending_always_on_macbook_card():
"""보류(deferred_until 미래)는 summarize+deep_summary 합산으로 macbook 카드 귀속.
deep 슬롯 유무와 무관 (보류 = 맥북 불가 신호)."""
stats = {
"summarize": _stage(pending=5, deferred_pending=2),
"deep_summary": _stage(pending=1, deferred_pending=1),
}
for deep_enabled in (True, False):
machines = build_machines(stats, _split(), [], deep_enabled=deep_enabled)
assert _machine(machines, "macbook")["deferred_pending"] == 3
assert _machine(machines, "gpu")["deferred_pending"] == 0
assert _machine(machines, "macmini")["deferred_pending"] == 0
# ─── state 판정 ───────────────────────────────────────────────────────────────
def test_macbook_state_active_wins_over_deferred_while_working():
"""가동 > 보류 (사용자 피드백 2026-06-11): 일하고 있으면 백오프 잔여가 있어도 '가동'.
보류 건수는 deferred_pending 필드가 별도로 전달 카드 라인이 표시.
"""
stats = {"summarize": _stage(pending=1, deferred_pending=1)}
split = _split(macbook={"done_15m": 3})
machines = build_machines(stats, split, [], deep_enabled=True)
mb = _machine(machines, "macbook")
assert mb["state"] == "active"
assert mb["deferred_pending"] == 1
def test_macbook_state_deferred_only_when_not_working():
"""일이 멈춰 있고(처리 0·최근 완료 0) 백오프만 쌓인 상태에서만 '보류'."""
stats = {"summarize": _stage(pending=1, deferred_pending=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "deferred"
def test_macbook_state_active_on_recent_qwen_done():
split = _split(macbook={"done_15m": 1})
machines = build_machines({}, split, [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "active"
def test_macbook_state_idle():
machines = build_machines({}, _split(), [], deep_enabled=True)
assert _machine(machines, "macbook")["state"] == "idle"
def test_gpu_state_active_on_processing():
stats = {"extract": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_active_on_recent_done():
stats = {"embed": _stage(done_15m=2)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "active"
def test_gpu_state_idle_when_old_done_only():
stats = {"embed": _stage(done_1h=5, done_today=9)} # 15분 내 완료 없음
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "gpu")["state"] == "idle"
def test_macmini_state_not_active_on_macbook_pool_done():
"""summarize 풀 완료가 전부 macbook 몫이면 macmini 는 active 아님 (귀속 기준)."""
stats = {"summarize": _stage(done_15m=1)}
split = _split(macbook={"done_15m": 1})
machines = build_machines(stats, split, [], deep_enabled=True)
assert _machine(machines, "macmini")["state"] == "idle"
def test_macmini_state_active_on_summarize_processing():
stats = {"summarize": _stage(processing=1)}
machines = build_machines(stats, _split(), [], deep_enabled=True)
assert _machine(machines, "macmini")["state"] == "active"
# ─── current 귀속 ─────────────────────────────────────────────────────────────
def test_current_summarize_to_macmini_max_two():
rows = [
{"stage": "summarize", "document_id": 1, "title": "문서A", "original_filename": None, "file_path": None},
{"stage": "summarize", "document_id": 2, "title": "문서B", "original_filename": None, "file_path": None},
{"stage": "summarize", "document_id": 3, "title": "문서C", "original_filename": None, "file_path": None},
{"stage": "extract", "document_id": 4, "title": "문서D", "original_filename": None, "file_path": None},
]
machines = build_machines({}, _split(), rows, deep_enabled=True)
macmini = _machine(machines, "macmini")
gpu = _machine(machines, "gpu")
assert [c["document_id"] for c in macmini["current"]] == [1, 2] # 최대 2건
assert macmini["current"][0] == {"document_id": 1, "title": "문서A", "stage": "summarize"}
assert [c["document_id"] for c in gpu["current"]] == [4]
assert _machine(machines, "macbook")["current"] == []
def test_current_deep_summary_follows_deep_slot():
rows = [{"stage": "deep_summary", "document_id": 9, "title": "심층", "original_filename": None, "file_path": None}]
enabled = build_machines({}, _split(), rows, deep_enabled=True)
disabled = build_machines({}, _split(), rows, deep_enabled=False)
assert _machine(enabled, "macbook")["current"][0]["document_id"] == 9
assert _machine(disabled, "macmini")["current"][0]["document_id"] == 9
def test_display_title_fallback_chain():
assert display_title({"document_id": 1, "title": "제목"}) == "제목"
assert display_title({"document_id": 1, "title": None, "original_filename": "a.pdf"}) == "a.pdf"
assert display_title(
{"document_id": 1, "title": None, "original_filename": None, "file_path": "/documents/PKM/Inbox/b.hwp"}
) == "b.hwp"
assert display_title(
{"document_id": 7, "title": None, "original_filename": None, "file_path": None}
) == "문서 #7"
# ─── summarize ETA ────────────────────────────────────────────────────────────
def test_eta_minutes_positive_drain():
# 순소화 6건/h, 잔량 30건 → 300분
assert compute_eta_minutes(30, 10, 4) == 300
def test_eta_minutes_null_when_not_draining():
assert compute_eta_minutes(30, 4, 10) is None # 유입 > 소화
assert compute_eta_minutes(30, 5, 5) is None # 동률도 null
assert compute_eta_minutes(30, 0, 0) is None
def test_eta_minutes_zero_pending():
assert compute_eta_minutes(0, 10, 4) == 0
def test_build_summarize_eta_pending_includes_deferred():
stats = {"summarize": _stage(pending=12, deferred_pending=5, done_1h=8, created_1h=2)}
eta = build_summarize_eta(stats)
assert eta == {
"pending": 12, # 보류 포함 총수 (pending 자체에 deferred 포함)
"done_rate_1h": 8,
"inflow_rate_1h": 2,
"eta_minutes": round(12 / 6 * 60),
}
def test_build_summarize_eta_empty_stats():
eta = build_summarize_eta({})
assert eta == {"pending": 0, "done_rate_1h": 0, "inflow_rate_1h": 0, "eta_minutes": None}
# ─── trend 24h ────────────────────────────────────────────────────────────────
def test_trend_24_buckets_oldest_first_with_gaps():
now_kst = datetime(2026, 6, 11, 14, 30, tzinfo=KST)
inflow = {"2026-06-11 13:00": 3, "2026-06-10 15:00": 1} # 15:00 어제 = 최고령 버킷
done = {"2026-06-11 14:00": 2}
trend = build_trend(inflow, done, now_kst)
assert len(trend) == 24
assert trend[0] == {"hour": "15:00", "inflow": 1, "done": 0} # 오래된 것부터
assert trend[-1] == {"hour": "14:00", "inflow": 0, "done": 2} # 현재 시각 버킷
assert trend[-2] == {"hour": "13:00", "inflow": 3, "done": 0}
# 빈 버킷은 0
assert sum(b["inflow"] for b in trend) == 4
assert sum(b["done"] for b in trend) == 2
def test_trend_ignores_out_of_window_bucket():
"""창 밖(24버킷 미포함) key 는 무시 — cutoff 경계 행이 섞여도 안전."""
now_kst = datetime(2026, 6, 11, 14, 30, tzinfo=KST)
inflow = {"2026-06-10 14:00": 99} # 14:00 어제 — 창의 최고령(15:00 어제) 이전
trend = build_trend(inflow, {}, now_kst)
assert sum(b["inflow"] for b in trend) == 0
def test_trend_kst_midnight_crossing_labels():
now_kst = datetime(2026, 6, 11, 2, 5, tzinfo=KST)
trend = build_trend({}, {}, now_kst)
assert trend[-1]["hour"] == "02:00"
assert trend[0]["hour"] == "03:00" # 전날 03:00 (라벨은 HH:00 만)
assert [b["hour"] for b in trend[-3:]] == ["00:00", "01:00", "02:00"]
# ─── totals / row 변환 / 전체 조립 ───────────────────────────────────────────
def test_totals_sum_all_stages():
stats = {
"extract": _stage(pending=1, processing=2, failed=3),
"summarize": _stage(pending=4, failed=1),
"deep_summary": _stage(pending=2),
}
assert build_totals(stats) == {"pending": 7, "processing": 2, "failed": 4}
def test_rows_to_stage_stats_conversion():
rows = [
("extract", 3, 1, 0, 5, 9, 1, 0, 2),
("summarize", 7, None, 1, 10, 20, 0, 2, 4), # None 방어
]
stats = rows_to_stage_stats(rows)
assert stats["extract"]["pending"] == 3 and stats["extract"]["created_1h"] == 2
assert stats["summarize"]["processing"] == 0
assert stats["summarize"]["deferred_pending"] == 2
def test_rows_to_summarize_split_conversion():
rows = [
(True, 4, 8, 1), # is_macbook
(False, 6, 12, 0),
]
split = rows_to_summarize_split(rows)
assert split["macbook"] == {"done_1h": 4, "done_today": 8, "done_15m": 1}
assert split["macmini"] == {"done_1h": 6, "done_today": 12, "done_15m": 0}
def test_rows_to_summarize_split_empty():
split = rows_to_summarize_split([])
assert split["macbook"]["done_1h"] == 0 and split["macmini"]["done_today"] == 0
def test_compose_overview_contract_shape():
"""응답 dict 의 키가 FE 계약 shape 과 정확히 일치하는지 고정."""
out = compose_overview(
{"summarize": _stage(pending=1)},
_split(),
{}, {}, [],
deep_enabled=True,
now_kst=datetime(2026, 6, 11, 14, 30, tzinfo=KST),
)
assert set(out.keys()) == {"machines", "stages", "summarize_eta", "trend_24h", "totals"}
assert [m["key"] for m in out["machines"]] == ["gpu", "macmini", "macbook"]
for m in out["machines"]:
assert set(m.keys()) == {
"key", "label", "state", "stages", "pending", "processing", "failed",
"done_1h", "done_today", "deferred_pending", "current",
}
assert m["state"] in ("active", "deferred", "idle")
assert set(out["summarize_eta"].keys()) == {"pending", "done_rate_1h", "inflow_rate_1h", "eta_minutes"}
assert len(out["trend_24h"]) == 24
assert set(out["trend_24h"][0].keys()) == {"hour", "inflow", "done"}
assert set(out["totals"].keys()) == {"pending", "processing", "failed"}
# 머신 label 고정 (raw 모델명 노출 금지 — label 만)
assert [m["label"] for m in out["machines"]] == ["GPU 서버", "맥미니", "맥북 M5 Max"]
# ─── build_stages (단계별 현황 — 2026-06-11 사용자 피드백: 완료 가시화) ──────
def test_build_stages_order_fields_and_age():
from datetime import timedelta, timezone
from services.queue_overview import build_stages
now = datetime(2026, 6, 11, 14, 0, tzinfo=timezone.utc)
stats = {
"summarize": {**_stage(pending=5, done_today=12),
"oldest_pending_at": now - timedelta(hours=4)},
"extract": _stage(failed=2),
}
rows = build_stages(stats, now=now)
by = {r["stage"]: r for r in rows}
# 파이프라인 순서: extract 가 summarize 보다 앞
assert rows[0]["stage"] == "extract"
assert by["summarize"]["pending"] == 5
assert by["summarize"]["done_today"] == 12
assert by["summarize"]["oldest_pending_age_sec"] == 4 * 3600
assert by["extract"]["failed"] == 2
assert by["extract"]["oldest_pending_age_sec"] is None
# 전 stage 행 존재 (빈 단계 숨김은 FE 몫)
assert {"stage", "pending", "processing", "failed", "done_1h", "created_1h",
"done_today", "oldest_pending_age_sec"} == set(rows[0].keys())
def test_build_stages_exposes_rates():
"""ds-board-engines-1: done_1h/created_1h 노출 — 흐름 노드 처리율·ETA·유입 우세 재료."""
from services.queue_overview import build_stages
stats = {"embed": _stage(pending=4, done_1h=600, created_1h=120, done_today=900)}
rows = build_stages(stats)
embed = next(r for r in rows if r["stage"] == "embed")
assert (embed["done_1h"], embed["created_1h"], embed["done_today"]) == (600, 120, 900)