4c111ca7f2
f325bd0 이 서비스 payload·frontend 타입엔 machine 을 넣었으나 API Pydantic
response_model(BackgroundJobItem)에 누락 → FastAPI 가 직렬화 시 탈락. 한 줄 추가.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
209 lines
6.2 KiB
Python
209 lines
6.2 KiB
Python
"""처리 머신 보드 API — /api/queue/* (plan ds-processing-ui-6an → ds-board-engines-1).
|
|
|
|
- GET /overview: 홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계
|
|
로직은 services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와
|
|
계약 고정. 응답에 raw 모델명 노출 금지 — 머신 label 만 (엔진/모델 표기는
|
|
FE 정적 맵 책임).
|
|
- GET /failed + POST /retry|/skip: 실패 처리 (ds-board-engines-1) — 영구 실패
|
|
(자동 재시도 3회 소진)의 유일한 사용자 조치 경로. 일괄 조치는 FE 가 그룹의
|
|
id 목록을 모아 보낸다 (서버측 패턴 매칭 없음 — raw 식별자/패턴 미수신).
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import Annotated, Literal
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from models.user import User
|
|
from services.queue_overview import (
|
|
build_overview,
|
|
fetch_failed_items,
|
|
retry_failed,
|
|
skip_failed,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class CurrentItem(BaseModel):
|
|
"""머신이 지금 처리 중인 문서 (최대 2건)."""
|
|
document_id: int
|
|
title: str
|
|
stage: str
|
|
|
|
|
|
class MachineCard(BaseModel):
|
|
"""머신 카드 — stage 귀속 합산 + 완료 실적(summarize 는 풀 분리) + state."""
|
|
key: Literal["gpu", "macmini", "macbook"]
|
|
label: str
|
|
state: Literal["active", "deferred", "idle"]
|
|
stages: list[str]
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
done_1h: int
|
|
done_today: int
|
|
deferred_pending: int
|
|
current: list[CurrentItem]
|
|
|
|
|
|
class SummarizeEta(BaseModel):
|
|
"""summarize 풀 ETA — done > inflow 일 때만 eta_minutes 산출."""
|
|
pending: int
|
|
done_rate_1h: int
|
|
inflow_rate_1h: int
|
|
eta_minutes: int | None
|
|
|
|
|
|
class MachineDone(BaseModel):
|
|
"""머신 1대의 summarize 완료 실적 (분담 표시용)."""
|
|
done_1h: int
|
|
done_today: int
|
|
|
|
|
|
class SummarizeByMachine(BaseModel):
|
|
"""summarize 풀의 머신별 완료 실적 분담 — 보드 레인의 '맥미니 vs 맥북'
|
|
오프로드 가시화용. rows_to_summarize_split 이 이미 계산하던 값의 노출
|
|
(ds-board-merged A-1, 신규 수집 SQL 0)."""
|
|
macmini: MachineDone
|
|
macbook: MachineDone
|
|
|
|
|
|
class TrendBucket(BaseModel):
|
|
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
|
|
hour: str
|
|
inflow: int
|
|
done: int
|
|
|
|
|
|
class Totals(BaseModel):
|
|
"""전 stage 합계."""
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
|
|
|
|
class StageRow(BaseModel):
|
|
"""단계별 현황 행 — 흐름 노드/상세 패널용.
|
|
|
|
done_1h/created_1h = 처리율·유입률 (유입 우세 판정 + ETA 의 FE 재료,
|
|
ds-board-engines-1 추가 — 수집 SQL 에 이미 있던 값의 노출).
|
|
"""
|
|
stage: str
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
done_1h: int
|
|
created_1h: int
|
|
done_today: int
|
|
oldest_pending_age_sec: int | None
|
|
|
|
|
|
class BackgroundJobItem(BaseModel):
|
|
"""큐 밖 관리 스크립트(백필 등) 작업 — processing_queue 가 못 보는 사각지대 노출.
|
|
stale = running 인데 heartbeat 가 오래 끊김(프로세스 사망 추정)."""
|
|
id: int
|
|
kind: str
|
|
machine: str
|
|
label: str | None
|
|
state: Literal["running", "done", "failed"]
|
|
processed: int
|
|
total: int | None
|
|
elapsed_sec: int
|
|
stale: bool
|
|
error: str | None
|
|
|
|
|
|
class QueueOverviewResponse(BaseModel):
|
|
machines: list[MachineCard]
|
|
stages: list[StageRow]
|
|
summarize_eta: SummarizeEta
|
|
summarize_by_machine: SummarizeByMachine
|
|
trend_24h: list[TrendBucket]
|
|
totals: Totals
|
|
background_jobs: list[BackgroundJobItem] = []
|
|
|
|
|
|
class FailedItem(BaseModel):
|
|
"""영구 실패 행 — 실패 드로어 표시 단위."""
|
|
id: int
|
|
stage: str
|
|
document_id: int
|
|
title: str
|
|
attempts: int
|
|
max_attempts: int
|
|
error_message: str | None
|
|
failed_at: datetime | None
|
|
|
|
|
|
class FailedListResponse(BaseModel):
|
|
items: list[FailedItem]
|
|
total: int
|
|
|
|
|
|
class QueueActionRequest(BaseModel):
|
|
"""재시도/건너뛰기 대상 — 실패 행 id 목록 (FE 가 그룹핑 후 전달)."""
|
|
ids: list[int] = Field(min_length=1, max_length=300)
|
|
|
|
|
|
class RetryResponse(BaseModel):
|
|
requested: int
|
|
retried: int
|
|
not_retried: int
|
|
|
|
|
|
class SkipResponse(BaseModel):
|
|
requested: int
|
|
skipped: int
|
|
not_skipped: int
|
|
|
|
|
|
@router.get("/overview", response_model=QueueOverviewResponse)
|
|
async def get_queue_overview(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""머신 관점 처리 보드 + summarize ETA 집계 (라이브 계산, 신규 테이블 0)"""
|
|
return QueueOverviewResponse.model_validate(await build_overview(session))
|
|
|
|
|
|
@router.get("/failed", response_model=FailedListResponse)
|
|
async def get_failed_items(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)"""
|
|
items = await fetch_failed_items(session)
|
|
return FailedListResponse(
|
|
items=[FailedItem.model_validate(i) for i in items],
|
|
total=len(items),
|
|
)
|
|
|
|
|
|
@router.post("/retry", response_model=RetryResponse)
|
|
async def retry_failed_items(
|
|
body: QueueActionRequest,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""실패 행 재시도 — attempts 리셋 + pending 복귀.
|
|
|
|
not_retried = 같은 (문서, 단계) 의 active 행 충돌(uq_queue_active) 또는
|
|
이미 failed 가 아닌 행 (중복 클릭 등) — 건드리지 않고 건수만 보고.
|
|
"""
|
|
return RetryResponse.model_validate(await retry_failed(session, body.ids))
|
|
|
|
|
|
@router.post("/skip", response_model=SkipResponse)
|
|
async def skip_failed_items(
|
|
body: QueueActionRequest,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""실패 행 건너뛰기 — completed 마킹(payload.skipped_by_user) + 연쇄 없음"""
|
|
return SkipResponse.model_validate(await skip_failed(session, body.ids))
|