feat(ui): 처리 머신 보드 — 누가 일하나 (안2) + ETA·전 페이지 스트립/드로어 (안5/6 라이트)

plan ds-processing-ui-6an (시안 choice 채택: 안2 1차 + 안5/6 지원):
- GET /api/queue/overview — 머신(GPU/맥미니/맥북) 귀속 라이브 집계 5쿼리, 마이그레이션 0.
  summarize 풀 완료 실적은 documents.ai_model_version 조인으로 맥북/맥미니 분리,
  보류(deferred_until)=맥북 카드 귀속, state=active/deferred/idle. raw 모델명 비노출
- 홈: 처리 머신 보드(3열 카드 + 지금 처리 중 제목) + ETA 라인(유입 우세 시 null 명시),
  기존 stage 테이블은 details 접힘으로 강등 (구조 개편)
- 전 페이지: 상태 스트립(처리중·대기·실패·맥북 칩) + 우측 드로어(QueueDrawer,
  dialog a11y) — 공유 60s 폴링 store, 경량 fetch(401 강제 logout 부수효과 회피)
- tests: 판정부 30건 (귀속/풀 분리/state 9케이스/ETA 경계/trend 버킷/계약 shape)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-11 14:13:35 +09:00
parent 01db4816fd
commit 468804494d
11 changed files with 1205 additions and 5 deletions
+375
View File
@@ -0,0 +1,375 @@
"""처리 머신 보드 + ETA 집계 (plan ds-processing-ui-6an, 안2+안5/6).
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리.
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
귀속 규칙 (단일 진실):
- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/
fulltext/stt · macmini = classify/summarize · macbook = deep_summary
(단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속).
- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료
실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook'
이면 macbook 실적, 아니면 macmini 실적.
- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속
(보류 = 맥북 불가 신호).
"""
from datetime import datetime, timedelta
from posixpath import basename
from zoneinfo import ZoneInfo
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
KST = ZoneInfo("Asia/Seoul")
# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출.
_MACBOOK_MODEL_ALIAS = "qwen-macbook"
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
_GPU_STAGES = (
"extract", "embed", "chunk", "markdown",
"preview", "thumbnail", "fulltext", "stt",
)
_MACMINI_STAGES = ("classify", "summarize")
_MACBOOK_STAGES = ("deep_summary",)
_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES
_MACHINE_KEYS = ("gpu", "macmini", "macbook")
_MACHINE_LABELS = {
"gpu": "GPU 서버",
"macmini": "맥미니",
"macbook": "맥북 M5 Max",
}
# 머신 카드당 current 표시 상한
_CURRENT_LIMIT = 2
def stage_machine_map(deep_enabled: bool) -> dict[str, str]:
"""stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속."""
mapping: dict[str, str] = {}
for s in _GPU_STAGES:
mapping[s] = "gpu"
for s in _MACMINI_STAGES:
mapping[s] = "macmini"
for s in _MACBOOK_STAGES:
mapping[s] = "macbook" if deep_enabled else "macmini"
return mapping
def _zero_stage() -> dict:
return {
"pending": 0, "processing": 0, "failed": 0,
"done_1h": 0, "done_today": 0, "done_15m": 0,
"deferred_pending": 0, "created_1h": 0,
}
def rows_to_stage_stats(rows) -> dict[str, dict]:
"""stage×status 집계 쿼리 행 → {stage: {pending, ..., created_1h}} 변환."""
stats: dict[str, dict] = {}
for row in rows:
stats[row[0]] = {
"pending": int(row[1] or 0),
"processing": int(row[2] or 0),
"failed": int(row[3] or 0),
"done_1h": int(row[4] or 0),
"done_today": int(row[5] or 0),
"done_15m": int(row[6] or 0),
"deferred_pending": int(row[7] or 0),
"created_1h": int(row[8] or 0),
}
return stats
def rows_to_summarize_split(rows) -> dict[str, dict]:
"""summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}.
is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용).
"""
split = {
"macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0},
"macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0},
}
for row in rows:
key = "macbook" if row[0] else "macmini"
split[key]["done_1h"] += int(row[1] or 0)
split[key]["done_today"] += int(row[2] or 0)
split[key]["done_15m"] += int(row[3] or 0)
return split
def display_title(row: dict) -> str:
"""표시용 제목 — title > original_filename > file_path basename > 문서 id."""
if row.get("title"):
return row["title"]
if row.get("original_filename"):
return row["original_filename"]
if row.get("file_path"):
return basename(row["file_path"].rstrip("/"))
return f"문서 #{row['document_id']}"
def build_machines(
stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
current_rows: list[dict],
*,
deep_enabled: bool,
) -> list[dict]:
"""머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부."""
smap = stage_machine_map(deep_enabled)
def g(stage: str, field: str) -> int:
return stage_stats.get(stage, {}).get(field, 0)
# current 귀속: processing 행을 머신별 최대 2건 (summarize processing → macmini)
current_by_machine: dict[str, list[dict]] = {k: [] for k in _MACHINE_KEYS}
for row in current_rows:
machine = smap.get(row["stage"])
if machine and len(current_by_machine[machine]) < _CURRENT_LIMIT:
current_by_machine[machine].append({
"document_id": row["document_id"],
"title": display_title(row),
"stage": row["stage"],
})
machines = []
for key in _MACHINE_KEYS:
stages = [s for s in _STAGE_ORDER if smap[s] == key]
pending = sum(g(s, "pending") for s in stages)
processing = sum(g(s, "processing") for s in stages)
failed = sum(g(s, "failed") for s in stages)
# 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속
done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize")
done_today = sum(g(s, "done_today") for s in stages if s != "summarize")
done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize")
if key in summarize_split:
done_1h += summarize_split[key]["done_1h"]
done_today += summarize_split[key]["done_today"]
done_15m += summarize_split[key]["done_15m"]
# 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관)
deferred_pending = (
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
if key == "macbook" else 0
)
# state 판정 — macbook 의 done_15m 은 위에서 deep_summary stage 분 + summarize 풀
# split 분이 이미 합산돼 있고, processing 은 deep_summary 진행 중(=맥북 생성 중)을
# 포함하므로 동일 분기로 충분 (검수 보정 2026-06-11: deep 처리 중에도 active).
if key == "macbook" and deferred_pending > 0:
state = "deferred"
else:
state = "active" if (processing > 0 or done_15m > 0) else "idle"
machines.append({
"key": key,
"label": _MACHINE_LABELS[key],
"state": state,
"stages": stages,
"pending": pending,
"processing": processing,
"failed": failed,
"done_1h": done_1h,
"done_today": done_today,
"deferred_pending": deferred_pending,
"current": current_by_machine[key],
})
return machines
def compute_eta_minutes(pending: int, done_1h: int, inflow_1h: int) -> int | None:
"""ETA(분) = 순소화율 기반. done > inflow 일 때만 산출, 아니면 None (소화 불가)."""
if done_1h > inflow_1h:
return round(pending / (done_1h - inflow_1h) * 60)
return None
def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
"""summarize 풀 ETA — pending 은 보류(deferred) 포함 총수."""
s = stage_stats.get("summarize", _zero_stage())
pending = s["pending"]
done_rate = s["done_1h"]
inflow_rate = s["created_1h"]
return {
"pending": pending,
"done_rate_1h": done_rate,
"inflow_rate_1h": inflow_rate,
"eta_minutes": compute_eta_minutes(pending, done_rate, inflow_rate),
}
def build_trend(
inflow_buckets: dict[str, int],
done_buckets: dict[str, int],
now_kst: datetime,
) -> list[dict]:
"""summarize 24h 추이 — KST 시간 버킷 24개 (오래된 것부터, 빈 버킷 0).
버킷 key = "YYYY-MM-DD HH:00" (KST). SQL to_char 출력과 동일 포맷.
"""
base = now_kst.replace(minute=0, second=0, microsecond=0)
trend = []
for i in range(23, -1, -1):
bucket = base - timedelta(hours=i)
key = bucket.strftime("%Y-%m-%d %H:00")
trend.append({
"hour": bucket.strftime("%H:00"),
"inflow": inflow_buckets.get(key, 0),
"done": done_buckets.get(key, 0),
})
return trend
def build_totals(stage_stats: dict[str, dict]) -> dict:
"""전 stage 합계."""
return {
"pending": sum(s["pending"] for s in stage_stats.values()),
"processing": sum(s["processing"] for s in stage_stats.values()),
"failed": sum(s["failed"] for s in stage_stats.values()),
}
def compose_overview(
stage_stats: dict[str, dict],
summarize_split: dict[str, dict],
inflow_buckets: dict[str, int],
done_buckets: dict[str, int],
current_rows: list[dict],
*,
deep_enabled: bool,
now_kst: datetime,
) -> dict:
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
return {
"machines": build_machines(
stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled
),
"summarize_eta": build_summarize_eta(stage_stats),
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
"totals": build_totals(stage_stats),
}
# ─── SQL 수집부 (총 5쿼리) ────────────────────────────────────────────────────
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
_STAGE_STATS_SQL = """
SELECT
stage,
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
COUNT(*) FILTER (WHERE status = 'processing') AS processing,
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
COUNT(*) FILTER (WHERE status = 'completed'
AND completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
COUNT(*) FILTER (WHERE status = 'completed'
AND completed_at > :kst_midnight) AS done_today,
COUNT(*) FILTER (WHERE status = 'completed'
AND completed_at > NOW() - INTERVAL '15 minutes') AS done_15m,
COUNT(*) FILTER (WHERE status = 'pending'
AND payload ->> 'deferred_until' IS NOT NULL
AND (payload ->> 'deferred_until')::timestamptz > NOW())
AS deferred_pending,
COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '1 hour') AS created_1h
FROM processing_queue
GROUP BY stage
"""
# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방)
# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전).
_SUMMARIZE_SPLIT_SQL = """
SELECT
COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today,
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'summarize'
AND q.status = 'completed'
AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour')
GROUP BY 1
"""
# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
_TREND_INFLOW_SQL = """
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
'YYYY-MM-DD HH24:00') AS bucket,
COUNT(*) AS n
FROM processing_queue
WHERE stage = 'summarize'
AND created_at > NOW() - INTERVAL '24 hours'
GROUP BY 1
"""
_TREND_DONE_SQL = """
SELECT to_char(date_trunc('hour', completed_at AT TIME ZONE 'Asia/Seoul'),
'YYYY-MM-DD HH24:00') AS bucket,
COUNT(*) AS n
FROM processing_queue
WHERE stage = 'summarize'
AND status = 'completed'
AND completed_at > NOW() - INTERVAL '24 hours'
GROUP BY 1
"""
# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
_CURRENT_SQL = """
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.status = 'processing'
ORDER BY q.started_at DESC NULLS LAST
LIMIT 50
"""
async def build_overview(session: AsyncSession) -> dict:
"""5쿼리 수집 → compose_overview 판정 → 응답 dict."""
now_kst = datetime.now(KST)
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
deep_enabled = settings.ai is not None and settings.ai.deep is not None
stage_rows = (
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
).all()
split_rows = (
await session.execute(
text(_SUMMARIZE_SPLIT_SQL),
{"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS},
)
).all()
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
current_result = (await session.execute(text(_CURRENT_SQL))).all()
current_rows = [
{
"stage": row[0],
"document_id": row[1],
"title": row[2],
"original_filename": row[3],
"file_path": row[4],
}
for row in current_result
]
return compose_overview(
rows_to_stage_stats(stage_rows),
rows_to_summarize_split(split_rows),
{row[0]: int(row[1]) for row in inflow_rows},
{row[0]: int(row[1]) for row in done_rows},
current_rows,
deep_enabled=deep_enabled,
now_kst=now_kst,
)