diff --git a/app/api/queue_overview.py b/app/api/queue_overview.py
index d456dcb..c98e130 100644
--- a/app/api/queue_overview.py
+++ b/app/api/queue_overview.py
@@ -1,20 +1,30 @@
-"""처리 머신 보드 API — GET /api/queue/overview (plan ds-processing-ui-6an).
+"""처리 머신 보드 API — /api/queue/* (plan ds-processing-ui-6an → ds-board-engines-1).
-홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계 로직은
-services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와 계약 고정.
-응답에 raw 모델명 노출 금지 — 머신 label 만.
+- GET /overview: 홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계
+ 로직은 services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와
+ 계약 고정. 응답에 raw 모델명 노출 금지 — 머신 label 만 (엔진/모델 표기는
+ FE 정적 맵 책임).
+- GET /failed + POST /retry|/skip: 실패 처리 (ds-board-engines-1) — 영구 실패
+ (자동 재시도 3회 소진)의 유일한 사용자 조치 경로. 일괄 조치는 FE 가 그룹의
+ id 목록을 모아 보낸다 (서버측 패턴 매칭 없음 — raw 식별자/패턴 미수신).
"""
+from datetime import datetime
from typing import Annotated, Literal
from fastapi import APIRouter, Depends
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from models.user import User
-from services.queue_overview import build_overview
+from services.queue_overview import (
+ build_overview,
+ fetch_failed_items,
+ retry_failed,
+ skip_failed,
+)
router = APIRouter()
@@ -64,11 +74,17 @@ class Totals(BaseModel):
class StageRow(BaseModel):
- """단계별 현황 행 — '단계 상세' 패널용 (완료 가시화)."""
+ """단계별 현황 행 — 흐름 노드/상세 패널용.
+
+ done_1h/created_1h = 처리율·유입률 (유입 우세 판정 + ETA 의 FE 재료,
+ ds-board-engines-1 추가 — 수집 SQL 에 이미 있던 값의 노출).
+ """
stage: str
pending: int
processing: int
failed: int
+ done_1h: int
+ created_1h: int
done_today: int
oldest_pending_age_sec: int | None
@@ -81,6 +97,40 @@ class QueueOverviewResponse(BaseModel):
totals: Totals
+class FailedItem(BaseModel):
+ """영구 실패 행 — 실패 드로어 표시 단위."""
+ id: int
+ stage: str
+ document_id: int
+ title: str
+ attempts: int
+ max_attempts: int
+ error_message: str | None
+ failed_at: datetime | None
+
+
+class FailedListResponse(BaseModel):
+ items: list[FailedItem]
+ total: int
+
+
+class QueueActionRequest(BaseModel):
+ """재시도/건너뛰기 대상 — 실패 행 id 목록 (FE 가 그룹핑 후 전달)."""
+ ids: list[int] = Field(min_length=1, max_length=300)
+
+
+class RetryResponse(BaseModel):
+ requested: int
+ retried: int
+ not_retried: int
+
+
+class SkipResponse(BaseModel):
+ requested: int
+ skipped: int
+ not_skipped: int
+
+
@router.get("/overview", response_model=QueueOverviewResponse)
async def get_queue_overview(
user: Annotated[User, Depends(get_current_user)],
@@ -88,3 +138,40 @@ async def get_queue_overview(
):
"""머신 관점 처리 보드 + summarize ETA 집계 (라이브 계산, 신규 테이블 0)"""
return QueueOverviewResponse.model_validate(await build_overview(session))
+
+
+@router.get("/failed", response_model=FailedListResponse)
+async def get_failed_items(
+ user: Annotated[User, Depends(get_current_user)],
+ session: Annotated[AsyncSession, Depends(get_session)],
+):
+ """영구 실패 행 목록 (문서 제목 포함, 최대 300건)"""
+ items = await fetch_failed_items(session)
+ return FailedListResponse(
+ items=[FailedItem.model_validate(i) for i in items],
+ total=len(items),
+ )
+
+
+@router.post("/retry", response_model=RetryResponse)
+async def retry_failed_items(
+ body: QueueActionRequest,
+ user: Annotated[User, Depends(get_current_user)],
+ session: Annotated[AsyncSession, Depends(get_session)],
+):
+ """실패 행 재시도 — attempts 리셋 + pending 복귀.
+
+ not_retried = 같은 (문서, 단계) 의 active 행 충돌(uq_queue_active) 또는
+ 이미 failed 가 아닌 행 (중복 클릭 등) — 건드리지 않고 건수만 보고.
+ """
+ return RetryResponse.model_validate(await retry_failed(session, body.ids))
+
+
+@router.post("/skip", response_model=SkipResponse)
+async def skip_failed_items(
+ body: QueueActionRequest,
+ user: Annotated[User, Depends(get_current_user)],
+ session: Annotated[AsyncSession, Depends(get_session)],
+):
+ """실패 행 건너뛰기 — completed 마킹(payload.skipped_by_user) + 연쇄 없음"""
+ return SkipResponse.model_validate(await skip_failed(session, body.ids))
diff --git a/app/services/queue_overview.py b/app/services/queue_overview.py
index 81789f7..c796600 100644
--- a/app/services/queue_overview.py
+++ b/app/services/queue_overview.py
@@ -22,7 +22,7 @@ from datetime import datetime, timedelta
from posixpath import basename
from zoneinfo import ZoneInfo
-from sqlalchemy import text
+from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
@@ -258,6 +258,8 @@ def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]:
"pending": st["pending"],
"processing": st["processing"],
"failed": st["failed"],
+ "done_1h": st["done_1h"],
+ "created_1h": st["created_1h"],
"done_today": st["done_today"],
"oldest_pending_age_sec": age,
})
@@ -408,3 +410,104 @@ async def build_overview(session: AsyncSession) -> dict:
deep_enabled=deep_enabled,
now_kst=now_kst,
)
+
+
+# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
+# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은
+# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자).
+
+# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록)
+# started_at 을 시각 fallback 으로 쓴다.
+_FAILED_LIST_SQL = """
+ SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts,
+ q.error_message,
+ COALESCE(q.completed_at, q.started_at) AS failed_at,
+ d.title, d.original_filename, d.file_path
+ FROM processing_queue q
+ JOIN documents d ON d.id = q.document_id
+ WHERE q.status = 'failed'
+ ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST
+ LIMIT 300
+"""
+
+# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여).
+# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해.
+# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 —
+# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고.
+_RETRY_SQL = """
+ UPDATE processing_queue q
+ SET status = 'pending', attempts = 0,
+ started_at = NULL, completed_at = NULL
+ WHERE q.id IN :ids
+ AND q.status = 'failed'
+ AND NOT EXISTS (
+ SELECT 1 FROM processing_queue p
+ WHERE p.document_id = q.document_id
+ AND p.stage = q.stage
+ AND p.status IN ('pending', 'processing')
+ AND p.id <> q.id
+ )
+ RETURNING q.id
+"""
+
+# 건너뛰기: failed → completed + payload 마킹 (감사 추적).
+# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가
+# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로.
+_SKIP_SQL = """
+ UPDATE processing_queue
+ SET status = 'completed', completed_at = NOW(),
+ payload = COALESCE(payload, '{}'::jsonb)
+ || jsonb_build_object('skipped_by_user', true,
+ 'skipped_at', NOW()::text)
+ WHERE id IN :ids AND status = 'failed'
+ RETURNING id
+"""
+
+
+async def fetch_failed_items(session: AsyncSession) -> list[dict]:
+ """영구 실패 행 목록 (문서 제목 포함, 최대 300건)."""
+ rows = (await session.execute(text(_FAILED_LIST_SQL))).all()
+ return [
+ {
+ "id": r[0],
+ "stage": r[1],
+ "document_id": r[2],
+ "attempts": int(r[3] or 0),
+ "max_attempts": int(r[4] or 0),
+ "error_message": r[5],
+ "failed_at": r[6],
+ "title": display_title({
+ "document_id": r[2],
+ "title": r[7],
+ "original_filename": r[8],
+ "file_path": r[9],
+ }),
+ }
+ for r in rows
+ ]
+
+
+async def retry_failed(session: AsyncSession, ids: list[int]) -> dict:
+ """failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님."""
+ unique_ids = list(set(ids))
+ stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True))
+ retried = (await session.execute(stmt, {"ids": unique_ids})).all()
+ await session.commit()
+ return {
+ "requested": len(unique_ids),
+ "retried": len(retried),
+ "not_retried": len(unique_ids) - len(retried),
+ }
+
+
+async def skip_failed(session: AsyncSession, ids: list[int]) -> dict:
+ """failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음."""
+ unique_ids = list(set(ids))
+ stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True))
+ skipped = (await session.execute(stmt, {"ids": unique_ids})).all()
+ await session.commit()
+ return {
+ "requested": len(unique_ids),
+ "skipped": len(skipped),
+ "not_skipped": len(unique_ids) - len(skipped),
+ }
diff --git a/frontend/src/lib/components/ProcessingFlowBoard.svelte b/frontend/src/lib/components/ProcessingFlowBoard.svelte
new file mode 100644
index 0000000..f2e80bf
--- /dev/null
+++ b/frontend/src/lib/components/ProcessingFlowBoard.svelte
@@ -0,0 +1,419 @@
+
+
+
+
+
+
처리 머신
+
+ {#if totalFailed > 0}
+
+ {/if}
+ {#if spark}
+
+ {/if}
+
+
+
+
+
+ {#each machineStrip as m (m.key)}
+
+
+ {m.meta?.label ?? m.label}
+ {m.meta?.model}
+ {formatRate(m.done_1h)}/h
+ {#if m.key === 'macbook' && m.deferred_pending > 0}
+ 보류 {m.deferred_pending}
+ {/if}
+
+ {/each}
+
+
+
+
+ {#each mainNodes as n, i (n.def.key)}
+ {#if i > 0}
+
→
+ {/if}
+
toggleNode(n.def.key)}
+ onkeydown={(e) => { if (e.key === 'Enter' || e.key === ' ') { e.preventDefault(); toggleNode(n.def.key); } }}
+ title="{n.def.label} — 클릭하면 상세"
+ >
+ {#if n.failed > 0}
+
+ {/if}
+
+ {MACHINE_META[n.def.machine].label} · {n.def.engine}
+
+
+ {n.def.label}
+ {#if n.processing > 0}
+
+ {/if}
+ {#if n.inflowDominant}
+ 유입 우세
+ {/if}
+
+
+
+ {formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()}
+ {#if n.etaMinutes != null && !n.inflowDominant && n.pending > 0}
+ · {etaShort(n.etaMinutes)}
+ {/if}
+
+
+ {/each}
+
+
+
+
+ {#each auxActive as n, i (n.def.key)}
+ {i > 0 ? ' · ' : '보조: '}{n.def.label}({n.def.engine}) 대기 {n.pending.toLocaleString()} · {formatRate(n.done1h)}/h{n.failed > 0 ? ` · 실패 ${n.failed}` : ''}
+ {/each}
+ {#if auxIdle.length > 0}
+ {auxActive.length > 0 ? ' — ' : ''}한가: {auxIdle.map((n) => n.def.label).join(' · ')}
+ {/if}
+ — 뉴스 등 일부 소스는 분류/추출을 건너뜀 (흐름 그림은 대표 경로)
+
+
+
+ {#if selectedNode}
+
+
+ {selectedNode.def.label} — {selectedNode.def.engine}
+ {selectedNode.def.sub} · {MACHINE_META[selectedNode.def.machine].label}
+
+
+
+
+ {#if selectedNode.perStage.length > 1}
+ {#each selectedNode.perStage as row (row.stage)}
+
+ {flowStageLabel(row.stage)}
+
+ 대기 {row.pending.toLocaleString()}
+ · {formatRate(row.done_1h)}/h · 오늘 {row.done_today.toLocaleString()}
+ {#if row.failed > 0}· 실패 {row.failed}{/if}
+
+
+ {/each}
+ {/if}
+
+ {#if selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600}
+ 가장 오래 기다린 항목 {formatAgeSec(selectedNode.oldestAgeSec)}
+ {/if}
+ {#each nodeCurrent(selectedNode.def) as c, i (c.document_id + c.stage)}
+ {i === 0 && !(selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600) ? '' : ' · '}지금: {c.title} ({flowStageLabel(c.stage)})
+ {/each}
+ {#if selectedNode.failed > 0}
+ ·
+ {/if}
+
+
+
+ {/if}
+
+
+ {#if failOpen}
+
+
+ 실패 처리
+ 영구 실패 {failItems.length}건 — 자동 재시도 3회 소진, 수동 조치 대기
+
+
+ {#if failLoading}
+
불러오는 중…
+ {:else if failItems.length === 0}
+
영구 실패 항목 없음
+ {:else}
+ {#each failGroups as g (g.key)}
+
+
+ {flowStageLabel(g.stage)} {g.items.length}건
+ {g.pattern}{g.items[0]?.error_message && g.items[0].error_message.length > 36 ? '…' : ''}
+
+ {#each expanded[g.key] ? g.items : g.items.slice(0, 4) as it (it.id)}
+
+ {it.title}
+ 시도 {it.attempts}/{it.max_attempts}
+ {it.error_message ?? ''}
+
+
+
+ {/each}
+ {#if g.items.length > 4 && !expanded[g.key]}
+
+ {/if}
+ {#if g.items.length > 1}
+
+
+
+
+ {/if}
+
+ {/each}
+
+ 재시도 = 시도 횟수 리셋 후 큐 재진입 (자동 재시도 3회 새로 부여) · 건너뛰기 = 이 단계 완료 처리(후속 단계 연쇄 없음, 감사 마킹) · 같은 오류가 반복되는 항목(빈 텍스트 등)은 건너뛰기 권장
+
+ {/if}
+
+ {/if}
+
+
+
diff --git a/frontend/src/lib/types/queue.ts b/frontend/src/lib/types/queue.ts
index d0eccd4..740251d 100644
--- a/frontend/src/lib/types/queue.ts
+++ b/frontend/src/lib/types/queue.ts
@@ -61,6 +61,10 @@ export interface QueueStageRow {
pending: number;
processing: number;
failed: number;
+ /** 최근 1시간 완료 — 노드 처리율·ETA 재료 (ds-board-engines-1) */
+ done_1h: number;
+ /** 최근 1시간 유입 — 유입 우세 판정 재료 (ds-board-engines-1) */
+ created_1h: number;
done_today: number;
oldest_pending_age_sec: number | null;
}
@@ -72,3 +76,33 @@ export interface QueueOverview {
stages: QueueStageRow[];
totals: QueueTotals;
}
+
+/** ─── 실패 처리 (ds-board-engines-1) — GET /api/queue/failed · POST /retry|/skip ─── */
+
+export interface FailedItem {
+ id: number;
+ stage: string;
+ document_id: number;
+ title: string;
+ attempts: number;
+ max_attempts: number;
+ error_message: string | null;
+ failed_at: string | null;
+}
+
+export interface FailedListResponse {
+ items: FailedItem[];
+ total: number;
+}
+
+export interface RetryResponse {
+ requested: number;
+ retried: number;
+ not_retried: number;
+}
+
+export interface SkipResponse {
+ requested: number;
+ skipped: number;
+ not_skipped: number;
+}
diff --git a/frontend/src/lib/utils/queueDisplay.ts b/frontend/src/lib/utils/queueDisplay.ts
index 9310164..709d43f 100644
--- a/frontend/src/lib/utils/queueDisplay.ts
+++ b/frontend/src/lib/utils/queueDisplay.ts
@@ -36,3 +36,82 @@ export function etaPhrase(minutes: number): string {
const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
return `약 ${text}시간 후 소진 예상`;
}
+
+/** ETA 분 → 칩용 짧은 표기 ("약 4.6시간" / "약 12분") */
+export function etaShort(minutes: number): string {
+ if (minutes < 60) return `약 ${Math.max(1, Math.round(minutes))}분`;
+ const hours = minutes / 60;
+ const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10);
+ return `약 ${text}시간`;
+}
+
+/** 경과 초 → "N분 전 / N시간 전 / N일 전" */
+export function formatAgeSec(sec: number): string {
+ if (sec < 3600) return `${Math.max(1, Math.round(sec / 60))}분 전`;
+ if (sec < 86400) return `${Math.round(sec / 3600)}시간 전`;
+ return `${Math.round(sec / 86400)}일 전`;
+}
+
+/* ─── 흐름 보드 정적 매핑 (plan ds-board-engines-1) ───────────────────────────
+ * stage → 흐름 노드 / 엔진(모델) / 소속 머신. API 는 머신 label 과 단계 사실만
+ * 주고(raw 모델명 노출 금지 계약), 엔진·모델 표기는 여기 단일 지점이 책임진다.
+ * ★ 모델/엔진 교체 시 이 블록 1곳만 수정 (예: 맥미니 모델 스왑).
+ */
+
+export type FlowMachine = 'gpu' | 'macmini' | 'macbook';
+
+export interface FlowNodeDef {
+ key: string;
+ /** 노드 표시명 */
+ label: string;
+ /** 합산할 stage 키 (다중 = 같은 엔진 공유) */
+ stages: string[];
+ machine: FlowMachine;
+ /** 엔진/모델 표시명 (FE 정적 — 모델 교체 시 여기 수정) */
+ engine: string;
+ /** 보조 표기 (서비스/워커명) */
+ sub: string;
+}
+
+/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */
+export const FLOW_NODES: FlowNodeDef[] = [
+ { key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' },
+ { key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' },
+ { key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' },
+ { key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' },
+ { key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' },
+ { key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' },
+];
+
+/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */
+export const AUX_NODES: FlowNodeDef[] = [
+ { key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' },
+ { key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' },
+ { key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' },
+];
+
+/** 머신 스트립 메타 — 모델 표기 단일 지점 */
+export const MACHINE_META: Record = {
+ gpu: { label: 'GPU 서버', model: '특화 엔진' },
+ macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' },
+ macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' },
+};
+
+/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */
+export const FLOW_STAGE_LABEL: Record = {
+ extract: '추출',
+ classify: '분류',
+ summarize: '요약',
+ embed: '임베딩',
+ chunk: '청크',
+ preview: '미리보기',
+ stt: '전사',
+ thumbnail: '썸네일',
+ deep_summary: '심층분석',
+ markdown: '마크다운',
+ fulltext: '전문',
+};
+
+export function flowStageLabel(stage: string): string {
+ return FLOW_STAGE_LABEL[stage] ?? stage;
+}
diff --git a/frontend/src/routes/+page.svelte b/frontend/src/routes/+page.svelte
index d028892..451bdae 100644
--- a/frontend/src/routes/+page.svelte
+++ b/frontend/src/routes/+page.svelte
@@ -14,9 +14,7 @@
import { user } from '$lib/stores/auth';
import { api } from '$lib/api';
import { queueOverview, refreshQueueOverview } from '$lib/stores/queueOverview';
- import {
- MACHINE_STATE_LABEL, machineChipClass, machineDotClass, formatRate, etaPhrase,
- } from '$lib/utils/queueDisplay';
+ import ProcessingFlowBoard from '$lib/components/ProcessingFlowBoard.svelte';
import type { QueueOverview } from '$lib/types/queue';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import Skeleton from '$lib/components/ui/Skeleton.svelte';
@@ -460,65 +458,9 @@
-
+
{#if queue}
-
-
처리 머신
-
- {#each queue.machines as m (m.key)}
-
-
-
-
-
- {m.label}
-
-
- {MACHINE_STATE_LABEL[m.state]}
-
-
-
- {#if m.stages.length > 0}
-
- {#each m.stages as s (s)}
- {queueStageLabel(s)}
- {/each}
-
- {/if}
-
-
- 대기 {m.pending.toLocaleString()}
- · 처리율 {formatRate(m.done_1h)}/h
- · 오늘 {m.done_today.toLocaleString()}건
-
-
- {#if m.key === 'macbook' && m.deferred_pending > 0}
-
보류 {m.deferred_pending.toLocaleString()}건 — 자동 재개 대기
- {/if}
-
- {#if m.current.length > 0}
-
`${c.title} (${queueStageLabel(c.stage)})`).join(' · ')}>
- 지금: {m.current[0].title} ({queueStageLabel(m.current[0].stage)}){m.current.length > 1 ? ` 외 ${m.current.length - 1}건` : ''}
-
- {/if}
-
- {/each}
-
-
-
-
- 요약 대기 {queue.summarize_eta.pending.toLocaleString()}건
- — 소화 {formatRate(queue.summarize_eta.done_rate_1h)}/h
- · 유입 {formatRate(queue.summarize_eta.inflow_rate_1h)}/h
- {#if queue.summarize_eta.eta_minutes != null}
- · {etaPhrase(queue.summarize_eta.eta_minutes)}
- {:else}
- · 유입 우세(백필 중)
- {/if}
-
-
+
{/if}
diff --git a/tests/test_queue_overview.py b/tests/test_queue_overview.py
index d385592..c24664e 100644
--- a/tests/test_queue_overview.py
+++ b/tests/test_queue_overview.py
@@ -379,5 +379,14 @@ def test_build_stages_order_fields_and_age():
assert by["extract"]["failed"] == 2
assert by["extract"]["oldest_pending_age_sec"] is None
# 전 stage 행 존재 (빈 단계 숨김은 FE 몫)
- assert {"stage", "pending", "processing", "failed", "done_today",
- "oldest_pending_age_sec"} == set(rows[0].keys())
+ assert {"stage", "pending", "processing", "failed", "done_1h", "created_1h",
+ "done_today", "oldest_pending_age_sec"} == set(rows[0].keys())
+
+
+def test_build_stages_exposes_rates():
+ """ds-board-engines-1: done_1h/created_1h 노출 — 흐름 노드 처리율·ETA·유입 우세 재료."""
+ from services.queue_overview import build_stages
+ stats = {"embed": _stage(pending=4, done_1h=600, created_1h=120, done_today=900)}
+ rows = build_stages(stats)
+ embed = next(r for r in rows if r["stage"] == "embed")
+ assert (embed["done_1h"], embed["created_1h"], embed["done_today"]) == (600, 120, 900)