From 5581d3f1cedca9e5445c7c935dfe40a98044a24f Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 12 Jun 2026 01:05:04 +0000 Subject: [PATCH] =?UTF-8?q?feat(board):=20=EC=B2=98=EB=A6=AC=20=EB=B3=B4?= =?UTF-8?q?=EB=93=9C=20v2=20=E2=80=94=20=ED=8C=8C=EC=9D=B4=ED=94=84?= =?UTF-8?q?=EB=9D=BC=EC=9D=B8=20=ED=9D=90=EB=A6=84=20=EB=B7=B0=C2=B7?= =?UTF-8?q?=EC=97=94=EC=A7=84=20=EA=B5=AC=EB=B6=84=C2=B7=EC=8B=A4=ED=8C=A8?= =?UTF-8?q?=20=EC=9E=AC=EC=8B=9C=EB=8F=84/=EA=B1=B4=EB=84=88=EB=9B=B0?= =?UTF-8?q?=EA=B8=B0=20(ds-board-engines-1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 흐름 뷰 메인: 좌→우 노드(머신·엔진 태그, 유입 우세 amber, 실패 뱃지) + 머신 스트립(모델 표기) + trend_24h 스파크라인 첫 렌더 - 노드 클릭 상세 패널: KV 4칸 + 다중 stage 행 + 지금 처리 중 - 실패 처리 드로어: 에러 패턴 그룹 + 재시도/건너뛰기 (영구 실패의 첫 사용자 조치 경로) - API: stages[].done_1h/created_1h 노출 + GET /api/queue/failed + POST /api/queue/retry|/skip (uq_queue_active 충돌 skip, 건너뛰기는 enqueue_next_stage 미호출) - 엔진/모델 표기 = queueDisplay.ts 정적 맵 단일 지점 (모델 교체 시 1곳) Co-Authored-By: Claude Fable 5 --- app/api/queue_overview.py | 101 ++++- app/services/queue_overview.py | 105 ++++- .../lib/components/ProcessingFlowBoard.svelte | 419 ++++++++++++++++++ frontend/src/lib/types/queue.ts | 34 ++ frontend/src/lib/utils/queueDisplay.ts | 79 ++++ frontend/src/routes/+page.svelte | 64 +-- tests/test_queue_overview.py | 13 +- 7 files changed, 744 insertions(+), 71 deletions(-) create mode 100644 frontend/src/lib/components/ProcessingFlowBoard.svelte diff --git a/app/api/queue_overview.py b/app/api/queue_overview.py index d456dcb..c98e130 100644 --- a/app/api/queue_overview.py +++ b/app/api/queue_overview.py @@ -1,20 +1,30 @@ -"""처리 머신 보드 API — GET /api/queue/overview (plan ds-processing-ui-6an). +"""처리 머신 보드 API — /api/queue/* (plan ds-processing-ui-6an → ds-board-engines-1). -홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계 로직은 -services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와 계약 고정. -응답에 raw 모델명 노출 금지 — 머신 label 만. +- GET /overview: 홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계 + 로직은 services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와 + 계약 고정. 응답에 raw 모델명 노출 금지 — 머신 label 만 (엔진/모델 표기는 + FE 정적 맵 책임). +- GET /failed + POST /retry|/skip: 실패 처리 (ds-board-engines-1) — 영구 실패 + (자동 재시도 3회 소진)의 유일한 사용자 조치 경로. 일괄 조치는 FE 가 그룹의 + id 목록을 모아 보낸다 (서버측 패턴 매칭 없음 — raw 식별자/패턴 미수신). """ +from datetime import datetime from typing import Annotated, Literal from fastapi import APIRouter, Depends -from pydantic import BaseModel +from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from core.database import get_session from models.user import User -from services.queue_overview import build_overview +from services.queue_overview import ( + build_overview, + fetch_failed_items, + retry_failed, + skip_failed, +) router = APIRouter() @@ -64,11 +74,17 @@ class Totals(BaseModel): class StageRow(BaseModel): - """단계별 현황 행 — '단계 상세' 패널용 (완료 가시화).""" + """단계별 현황 행 — 흐름 노드/상세 패널용. + + done_1h/created_1h = 처리율·유입률 (유입 우세 판정 + ETA 의 FE 재료, + ds-board-engines-1 추가 — 수집 SQL 에 이미 있던 값의 노출). + """ stage: str pending: int processing: int failed: int + done_1h: int + created_1h: int done_today: int oldest_pending_age_sec: int | None @@ -81,6 +97,40 @@ class QueueOverviewResponse(BaseModel): totals: Totals +class FailedItem(BaseModel): + """영구 실패 행 — 실패 드로어 표시 단위.""" + id: int + stage: str + document_id: int + title: str + attempts: int + max_attempts: int + error_message: str | None + failed_at: datetime | None + + +class FailedListResponse(BaseModel): + items: list[FailedItem] + total: int + + +class QueueActionRequest(BaseModel): + """재시도/건너뛰기 대상 — 실패 행 id 목록 (FE 가 그룹핑 후 전달).""" + ids: list[int] = Field(min_length=1, max_length=300) + + +class RetryResponse(BaseModel): + requested: int + retried: int + not_retried: int + + +class SkipResponse(BaseModel): + requested: int + skipped: int + not_skipped: int + + @router.get("/overview", response_model=QueueOverviewResponse) async def get_queue_overview( user: Annotated[User, Depends(get_current_user)], @@ -88,3 +138,40 @@ async def get_queue_overview( ): """머신 관점 처리 보드 + summarize ETA 집계 (라이브 계산, 신규 테이블 0)""" return QueueOverviewResponse.model_validate(await build_overview(session)) + + +@router.get("/failed", response_model=FailedListResponse) +async def get_failed_items( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """영구 실패 행 목록 (문서 제목 포함, 최대 300건)""" + items = await fetch_failed_items(session) + return FailedListResponse( + items=[FailedItem.model_validate(i) for i in items], + total=len(items), + ) + + +@router.post("/retry", response_model=RetryResponse) +async def retry_failed_items( + body: QueueActionRequest, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """실패 행 재시도 — attempts 리셋 + pending 복귀. + + not_retried = 같은 (문서, 단계) 의 active 행 충돌(uq_queue_active) 또는 + 이미 failed 가 아닌 행 (중복 클릭 등) — 건드리지 않고 건수만 보고. + """ + return RetryResponse.model_validate(await retry_failed(session, body.ids)) + + +@router.post("/skip", response_model=SkipResponse) +async def skip_failed_items( + body: QueueActionRequest, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """실패 행 건너뛰기 — completed 마킹(payload.skipped_by_user) + 연쇄 없음""" + return SkipResponse.model_validate(await skip_failed(session, body.ids)) diff --git a/app/services/queue_overview.py b/app/services/queue_overview.py index 81789f7..c796600 100644 --- a/app/services/queue_overview.py +++ b/app/services/queue_overview.py @@ -22,7 +22,7 @@ from datetime import datetime, timedelta from posixpath import basename from zoneinfo import ZoneInfo -from sqlalchemy import text +from sqlalchemy import bindparam, text from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings @@ -258,6 +258,8 @@ def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]: "pending": st["pending"], "processing": st["processing"], "failed": st["failed"], + "done_1h": st["done_1h"], + "created_1h": st["created_1h"], "done_today": st["done_today"], "oldest_pending_age_sec": age, }) @@ -408,3 +410,104 @@ async def build_overview(session: AsyncSession) -> dict: deep_enabled=deep_enabled, now_kst=now_kst, ) + + +# ─── 실패 처리 (plan ds-board-engines-1) ───────────────────────────────────── +# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은 +# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자). + +# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록) +# started_at 을 시각 fallback 으로 쓴다. +_FAILED_LIST_SQL = """ + SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts, + q.error_message, + COALESCE(q.completed_at, q.started_at) AS failed_at, + d.title, d.original_filename, d.file_path + FROM processing_queue q + JOIN documents d ON d.id = q.document_id + WHERE q.status = 'failed' + ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST + LIMIT 300 +""" + +# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여). +# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해. +# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 — +# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고. +_RETRY_SQL = """ + UPDATE processing_queue q + SET status = 'pending', attempts = 0, + started_at = NULL, completed_at = NULL + WHERE q.id IN :ids + AND q.status = 'failed' + AND NOT EXISTS ( + SELECT 1 FROM processing_queue p + WHERE p.document_id = q.document_id + AND p.stage = q.stage + AND p.status IN ('pending', 'processing') + AND p.id <> q.id + ) + RETURNING q.id +""" + +# 건너뛰기: failed → completed + payload 마킹 (감사 추적). +# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가 +# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로. +_SKIP_SQL = """ + UPDATE processing_queue + SET status = 'completed', completed_at = NOW(), + payload = COALESCE(payload, '{}'::jsonb) + || jsonb_build_object('skipped_by_user', true, + 'skipped_at', NOW()::text) + WHERE id IN :ids AND status = 'failed' + RETURNING id +""" + + +async def fetch_failed_items(session: AsyncSession) -> list[dict]: + """영구 실패 행 목록 (문서 제목 포함, 최대 300건).""" + rows = (await session.execute(text(_FAILED_LIST_SQL))).all() + return [ + { + "id": r[0], + "stage": r[1], + "document_id": r[2], + "attempts": int(r[3] or 0), + "max_attempts": int(r[4] or 0), + "error_message": r[5], + "failed_at": r[6], + "title": display_title({ + "document_id": r[2], + "title": r[7], + "original_filename": r[8], + "file_path": r[9], + }), + } + for r in rows + ] + + +async def retry_failed(session: AsyncSession, ids: list[int]) -> dict: + """failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님.""" + unique_ids = list(set(ids)) + stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True)) + retried = (await session.execute(stmt, {"ids": unique_ids})).all() + await session.commit() + return { + "requested": len(unique_ids), + "retried": len(retried), + "not_retried": len(unique_ids) - len(retried), + } + + +async def skip_failed(session: AsyncSession, ids: list[int]) -> dict: + """failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음.""" + unique_ids = list(set(ids)) + stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True)) + skipped = (await session.execute(stmt, {"ids": unique_ids})).all() + await session.commit() + return { + "requested": len(unique_ids), + "skipped": len(skipped), + "not_skipped": len(unique_ids) - len(skipped), + } diff --git a/frontend/src/lib/components/ProcessingFlowBoard.svelte b/frontend/src/lib/components/ProcessingFlowBoard.svelte new file mode 100644 index 0000000..f2e80bf --- /dev/null +++ b/frontend/src/lib/components/ProcessingFlowBoard.svelte @@ -0,0 +1,419 @@ + + +
+ +
+
처리 머신
+
+ {#if totalFailed > 0} + + {/if} + {#if spark} +
+ + + + + 요약 24h 유입/소화 +
+ {/if} +
+
+ + +
+ {#each machineStrip as m (m.key)} +
+ + {m.meta?.label ?? m.label} + {m.meta?.model} + {formatRate(m.done_1h)}/h + {#if m.key === 'macbook' && m.deferred_pending > 0} + 보류 {m.deferred_pending} + {/if} +
+ {/each} +
+ + +
+ {#each mainNodes as n, i (n.def.key)} + {#if i > 0} + + {/if} +
toggleNode(n.def.key)} + onkeydown={(e) => { if (e.key === 'Enter' || e.key === ' ') { e.preventDefault(); toggleNode(n.def.key); } }} + title="{n.def.label} — 클릭하면 상세" + > + {#if n.failed > 0} + + {/if} + + {MACHINE_META[n.def.machine].label} · {n.def.engine} + +
+ {n.def.label} + {#if n.processing > 0} + + {/if} + {#if n.inflowDominant} + 유입 우세 + {/if} +
+
+ {n.pending.toLocaleString()} +
+
+ {formatRate(n.done1h)}/h · 오늘 {n.doneToday.toLocaleString()} + {#if n.etaMinutes != null && !n.inflowDominant && n.pending > 0} + · {etaShort(n.etaMinutes)} + {/if} +
+
+ {/each} +
+ + +

+ {#each auxActive as n, i (n.def.key)} + {i > 0 ? ' · ' : '보조: '}{n.def.label}({n.def.engine}) 대기 {n.pending.toLocaleString()} · {formatRate(n.done1h)}/h{n.failed > 0 ? ` · 실패 ${n.failed}` : ''} + {/each} + {#if auxIdle.length > 0} + {auxActive.length > 0 ? ' — ' : ''}한가: {auxIdle.map((n) => n.def.label).join(' · ')} + {/if} + — 뉴스 등 일부 소스는 분류/추출을 건너뜀 (흐름 그림은 대표 경로) +

+ + + {#if selectedNode} +
+
+ {selectedNode.def.label} — {selectedNode.def.engine} + {selectedNode.def.sub} · {MACHINE_META[selectedNode.def.machine].label} + +
+
+
+
+
대기
+
{selectedNode.pending.toLocaleString()}
+
+
+
처리율 (1h)
+
{formatRate(selectedNode.done1h)}/h
+
+
+
오늘 완료
+
{selectedNode.doneToday.toLocaleString()}
+
+
+
소진 예상
+
+ {#if selectedNode.inflowDominant}유입 우세{:else if selectedNode.etaMinutes != null}{etaShort(selectedNode.etaMinutes)}{:else if selectedNode.pending === 0}한가{:else}—{/if} +
+
+
+ {#if selectedNode.perStage.length > 1} + {#each selectedNode.perStage as row (row.stage)} +
+ {flowStageLabel(row.stage)} + + 대기 {row.pending.toLocaleString()} + · {formatRate(row.done_1h)}/h · 오늘 {row.done_today.toLocaleString()} + {#if row.failed > 0}· 실패 {row.failed}{/if} + +
+ {/each} + {/if} +
+ {#if selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600} + 가장 오래 기다린 항목 {formatAgeSec(selectedNode.oldestAgeSec)} + {/if} + {#each nodeCurrent(selectedNode.def) as c, i (c.document_id + c.stage)} + {i === 0 && !(selectedNode.oldestAgeSec != null && selectedNode.oldestAgeSec > 600) ? '' : ' · '}지금: {c.title} ({flowStageLabel(c.stage)}) + {/each} + {#if selectedNode.failed > 0} + · + {/if} +
+
+
+ {/if} + + + {#if failOpen} +
+
+ 실패 처리 + 영구 실패 {failItems.length}건 — 자동 재시도 3회 소진, 수동 조치 대기 + +
+ {#if failLoading} +

불러오는 중…

+ {:else if failItems.length === 0} +

영구 실패 항목 없음

+ {:else} + {#each failGroups as g (g.key)} +
+
+ {flowStageLabel(g.stage)} {g.items.length}건 + {g.pattern}{g.items[0]?.error_message && g.items[0].error_message.length > 36 ? '…' : ''} +
+ {#each expanded[g.key] ? g.items : g.items.slice(0, 4) as it (it.id)} +
+ {it.title} + 시도 {it.attempts}/{it.max_attempts} + {it.error_message ?? ''} + + +
+ {/each} + {#if g.items.length > 4 && !expanded[g.key]} + + {/if} + {#if g.items.length > 1} +
+ + +
+ {/if} +
+ {/each} +

+ 재시도 = 시도 횟수 리셋 후 큐 재진입 (자동 재시도 3회 새로 부여) · 건너뛰기 = 이 단계 완료 처리(후속 단계 연쇄 없음, 감사 마킹) · 같은 오류가 반복되는 항목(빈 텍스트 등)은 건너뛰기 권장 +

+ {/if} +
+ {/if} +
+ + diff --git a/frontend/src/lib/types/queue.ts b/frontend/src/lib/types/queue.ts index d0eccd4..740251d 100644 --- a/frontend/src/lib/types/queue.ts +++ b/frontend/src/lib/types/queue.ts @@ -61,6 +61,10 @@ export interface QueueStageRow { pending: number; processing: number; failed: number; + /** 최근 1시간 완료 — 노드 처리율·ETA 재료 (ds-board-engines-1) */ + done_1h: number; + /** 최근 1시간 유입 — 유입 우세 판정 재료 (ds-board-engines-1) */ + created_1h: number; done_today: number; oldest_pending_age_sec: number | null; } @@ -72,3 +76,33 @@ export interface QueueOverview { stages: QueueStageRow[]; totals: QueueTotals; } + +/** ─── 실패 처리 (ds-board-engines-1) — GET /api/queue/failed · POST /retry|/skip ─── */ + +export interface FailedItem { + id: number; + stage: string; + document_id: number; + title: string; + attempts: number; + max_attempts: number; + error_message: string | null; + failed_at: string | null; +} + +export interface FailedListResponse { + items: FailedItem[]; + total: number; +} + +export interface RetryResponse { + requested: number; + retried: number; + not_retried: number; +} + +export interface SkipResponse { + requested: number; + skipped: number; + not_skipped: number; +} diff --git a/frontend/src/lib/utils/queueDisplay.ts b/frontend/src/lib/utils/queueDisplay.ts index 9310164..709d43f 100644 --- a/frontend/src/lib/utils/queueDisplay.ts +++ b/frontend/src/lib/utils/queueDisplay.ts @@ -36,3 +36,82 @@ export function etaPhrase(minutes: number): string { const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10); return `약 ${text}시간 후 소진 예상`; } + +/** ETA 분 → 칩용 짧은 표기 ("약 4.6시간" / "약 12분") */ +export function etaShort(minutes: number): string { + if (minutes < 60) return `약 ${Math.max(1, Math.round(minutes))}분`; + const hours = minutes / 60; + const text = hours >= 10 ? String(Math.round(hours)) : String(Math.round(hours * 10) / 10); + return `약 ${text}시간`; +} + +/** 경과 초 → "N분 전 / N시간 전 / N일 전" */ +export function formatAgeSec(sec: number): string { + if (sec < 3600) return `${Math.max(1, Math.round(sec / 60))}분 전`; + if (sec < 86400) return `${Math.round(sec / 3600)}시간 전`; + return `${Math.round(sec / 86400)}일 전`; +} + +/* ─── 흐름 보드 정적 매핑 (plan ds-board-engines-1) ─────────────────────────── + * stage → 흐름 노드 / 엔진(모델) / 소속 머신. API 는 머신 label 과 단계 사실만 + * 주고(raw 모델명 노출 금지 계약), 엔진·모델 표기는 여기 단일 지점이 책임진다. + * ★ 모델/엔진 교체 시 이 블록 1곳만 수정 (예: 맥미니 모델 스왑). + */ + +export type FlowMachine = 'gpu' | 'macmini' | 'macbook'; + +export interface FlowNodeDef { + key: string; + /** 노드 표시명 */ + label: string; + /** 합산할 stage 키 (다중 = 같은 엔진 공유) */ + stages: string[]; + machine: FlowMachine; + /** 엔진/모델 표시명 (FE 정적 — 모델 교체 시 여기 수정) */ + engine: string; + /** 보조 표기 (서비스/워커명) */ + sub: string; +} + +/** 메인 흐름 (문서 진행 순서). 뉴스 등 소스별 스킵 경로는 그림에 안 그림 — 단순화 한계. */ +export const FLOW_NODES: FlowNodeDef[] = [ + { key: 'extract', label: '추출', stages: ['extract'], machine: 'gpu', engine: 'Surya OCR', sub: 'ocr-service' }, + { key: 'markdown', label: '마크다운', stages: ['markdown'], machine: 'gpu', engine: 'Marker', sub: 'marker-service' }, + { key: 'classify', label: '분류', stages: ['classify'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'classify + triage' }, + { key: 'summarize', label: '요약', stages: ['summarize'], machine: 'macmini', engine: 'Qwen3.6-27B', sub: 'summarize' }, + { key: 'chunkembed', label: '청크 · 임베딩', stages: ['chunk', 'embed'], machine: 'gpu', engine: 'TEI bge-m3', sub: 'text-embeddings-inference' }, + { key: 'deep', label: '심층분석', stages: ['deep_summary'], machine: 'macbook', engine: 'Qwen3.6-27B', sub: 'deep_summary' }, +]; + +/** 보조 노드 — 메인 흐름 밖 (활동 있을 때만 보조 라인에 표시) */ +export const AUX_NODES: FlowNodeDef[] = [ + { key: 'fulltext', label: '전문 수집', stages: ['fulltext'], machine: 'gpu', engine: 'Playwright', sub: 'playwright-fetcher' }, + { key: 'stt', label: '전사', stages: ['stt'], machine: 'gpu', engine: 'Whisper', sub: 'stt-service' }, + { key: 'util', label: '미리보기 · 썸네일', stages: ['preview', 'thumbnail'], machine: 'gpu', engine: '유틸', sub: 'ffmpeg' }, +]; + +/** 머신 스트립 메타 — 모델 표기 단일 지점 */ +export const MACHINE_META: Record = { + gpu: { label: 'GPU 서버', model: '특화 엔진' }, + macmini: { label: '맥미니', model: 'Qwen3.6-27B-6bit · 24/7' }, + macbook: { label: '맥북 M5 Max', model: 'Qwen3.6-27B · 야간 drain' }, +}; + +/** 흐름 보드 단계 라벨 (드로어/상세 행 표기) */ +export const FLOW_STAGE_LABEL: Record = { + extract: '추출', + classify: '분류', + summarize: '요약', + embed: '임베딩', + chunk: '청크', + preview: '미리보기', + stt: '전사', + thumbnail: '썸네일', + deep_summary: '심층분석', + markdown: '마크다운', + fulltext: '전문', +}; + +export function flowStageLabel(stage: string): string { + return FLOW_STAGE_LABEL[stage] ?? stage; +} diff --git a/frontend/src/routes/+page.svelte b/frontend/src/routes/+page.svelte index d028892..451bdae 100644 --- a/frontend/src/routes/+page.svelte +++ b/frontend/src/routes/+page.svelte @@ -14,9 +14,7 @@ import { user } from '$lib/stores/auth'; import { api } from '$lib/api'; import { queueOverview, refreshQueueOverview } from '$lib/stores/queueOverview'; - import { - MACHINE_STATE_LABEL, machineChipClass, machineDotClass, formatRate, etaPhrase, - } from '$lib/utils/queueDisplay'; + import ProcessingFlowBoard from '$lib/components/ProcessingFlowBoard.svelte'; import type { QueueOverview } from '$lib/types/queue'; import EmptyState from '$lib/components/ui/EmptyState.svelte'; import Skeleton from '$lib/components/ui/Skeleton.svelte'; @@ -460,65 +458,9 @@ - + {#if queue} -
-
처리 머신
-
- {#each queue.machines as m (m.key)} -
- -
- - - {m.label} - - - {MACHINE_STATE_LABEL[m.state]} - -
- - {#if m.stages.length > 0} -
- {#each m.stages as s (s)} - {queueStageLabel(s)} - {/each} -
- {/if} - -
- 대기 {m.pending.toLocaleString()} - · 처리율 {formatRate(m.done_1h)}/h - · 오늘 {m.done_today.toLocaleString()}건 -
- - {#if m.key === 'macbook' && m.deferred_pending > 0} -
보류 {m.deferred_pending.toLocaleString()}건 — 자동 재개 대기
- {/if} - - {#if m.current.length > 0} -
`${c.title} (${queueStageLabel(c.stage)})`).join(' · ')}> - 지금: {m.current[0].title} ({queueStageLabel(m.current[0].stage)}){m.current.length > 1 ? ` 외 ${m.current.length - 1}건` : ''} -
- {/if} -
- {/each} -
- - -
- 요약 대기 {queue.summarize_eta.pending.toLocaleString()}건 - — 소화 {formatRate(queue.summarize_eta.done_rate_1h)}/h - · 유입 {formatRate(queue.summarize_eta.inflow_rate_1h)}/h - {#if queue.summarize_eta.eta_minutes != null} - · {etaPhrase(queue.summarize_eta.eta_minutes)} - {:else} - · 유입 우세(백필 중) - {/if} -
-
+ {/if} diff --git a/tests/test_queue_overview.py b/tests/test_queue_overview.py index d385592..c24664e 100644 --- a/tests/test_queue_overview.py +++ b/tests/test_queue_overview.py @@ -379,5 +379,14 @@ def test_build_stages_order_fields_and_age(): assert by["extract"]["failed"] == 2 assert by["extract"]["oldest_pending_age_sec"] is None # 전 stage 행 존재 (빈 단계 숨김은 FE 몫) - assert {"stage", "pending", "processing", "failed", "done_today", - "oldest_pending_age_sec"} == set(rows[0].keys()) + assert {"stage", "pending", "processing", "failed", "done_1h", "created_1h", + "done_today", "oldest_pending_age_sec"} == set(rows[0].keys()) + + +def test_build_stages_exposes_rates(): + """ds-board-engines-1: done_1h/created_1h 노출 — 흐름 노드 처리율·ETA·유입 우세 재료.""" + from services.queue_overview import build_stages + stats = {"embed": _stage(pending=4, done_1h=600, created_1h=120, done_today=900)} + rows = build_stages(stats) + embed = next(r for r in rows if r["stage"] == "embed") + assert (embed["done_1h"], embed["created_1h"], embed["done_today"]) == (600, 120, 900)