b91b05e889
2026-07-02 컷오버 반영 — GPU 서버 퇴역, 맥북 night-drain 보류(06-29 결정). - 레인 2개: 나스(추출/마크다운/청크·임베딩 등 DS 본체 Docker 스테이지), 맥미니(분류/요약/심층분석 — 단일 생성 LLM 허브 + bge-m3/리랭크) - summarize 풀 분리(summarize_by_machine·ai_model_version 조인 SQL) 제거 — FE 유일 소비자 확인 후 응답 스키마에서 정리 (5쿼리 -> 4쿼리) - 맥북 전제 UI 제거: 요약 오프로드 분담막대·요약 합류 칩·번다운 합류 변곡점 마커·잠듦 문구·전역 스트립 맥북 칩(맥미니 칩으로 대체) - deferred_pending = LLM 백오프 신호로 맥미니 카드 귀속 (기능 보존) - 번다운 차트·정직 ETA·실패 드로어·백그라운드 작업 등 머신 무관 기능 보존 - background_jobs 머신 귀속 기본값 gpu -> nas - 단위테스트 2노드 기준 재작성 (27 passed) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
194 lines
5.7 KiB
Python
194 lines
5.7 KiB
Python
"""처리 머신 보드 API — /api/queue/* (plan ds-processing-ui-6an → ds-board-engines-1).
|
|
|
|
- GET /overview: 홈 stage 평면 테이블을 "머신 관점 보드(누가 일하나)"로 — 집계
|
|
로직은 services/queue_overview.py (순수 판정부 분리). 응답 스키마는 FE 와
|
|
계약 고정. 응답에 raw 모델명 노출 금지 — 머신 label 만 (엔진/모델 표기는
|
|
FE 정적 맵 책임).
|
|
- GET /failed + POST /retry|/skip: 실패 처리 (ds-board-engines-1) — 영구 실패
|
|
(자동 재시도 3회 소진)의 유일한 사용자 조치 경로. 일괄 조치는 FE 가 그룹의
|
|
id 목록을 모아 보낸다 (서버측 패턴 매칭 없음 — raw 식별자/패턴 미수신).
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import Annotated, Literal
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from models.user import User
|
|
from services.queue_overview import (
|
|
build_overview,
|
|
fetch_failed_items,
|
|
retry_failed,
|
|
skip_failed,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class CurrentItem(BaseModel):
|
|
"""머신이 지금 처리 중인 문서 (최대 2건)."""
|
|
document_id: int
|
|
title: str
|
|
stage: str
|
|
|
|
|
|
class MachineCard(BaseModel):
|
|
"""머신 카드 — stage 귀속 합산 + 완료 실적 + state (나스/맥미니 2노드)."""
|
|
key: Literal["nas", "macmini"]
|
|
label: str
|
|
state: Literal["active", "deferred", "idle"]
|
|
stages: list[str]
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
done_1h: int
|
|
done_today: int
|
|
deferred_pending: int
|
|
current: list[CurrentItem]
|
|
|
|
|
|
class SummarizeEta(BaseModel):
|
|
"""summarize 풀 ETA — done > inflow 일 때만 eta_minutes 산출."""
|
|
pending: int
|
|
done_rate_1h: int
|
|
inflow_rate_1h: int
|
|
eta_minutes: int | None
|
|
|
|
|
|
class TrendBucket(BaseModel):
|
|
"""summarize 24h 추이 버킷 — hour 는 KST "HH:00" 라벨."""
|
|
hour: str
|
|
inflow: int
|
|
done: int
|
|
|
|
|
|
class Totals(BaseModel):
|
|
"""전 stage 합계."""
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
|
|
|
|
class StageRow(BaseModel):
|
|
"""단계별 현황 행 — 흐름 노드/상세 패널용.
|
|
|
|
done_1h/created_1h = 처리율·유입률 (유입 우세 판정 + ETA 의 FE 재료,
|
|
ds-board-engines-1 추가 — 수집 SQL 에 이미 있던 값의 노출).
|
|
"""
|
|
stage: str
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
done_1h: int
|
|
created_1h: int
|
|
done_today: int
|
|
oldest_pending_age_sec: int | None
|
|
|
|
|
|
class BackgroundJobItem(BaseModel):
|
|
"""큐 밖 관리 스크립트(백필 등) 작업 — processing_queue 가 못 보는 사각지대 노출.
|
|
stale = running 인데 heartbeat 가 오래 끊김(프로세스 사망 추정)."""
|
|
id: int
|
|
kind: str
|
|
machine: str
|
|
label: str | None
|
|
state: Literal["running", "done", "failed"]
|
|
processed: int
|
|
total: int | None
|
|
elapsed_sec: int
|
|
stale: bool
|
|
error: str | None
|
|
|
|
|
|
class QueueOverviewResponse(BaseModel):
|
|
machines: list[MachineCard]
|
|
stages: list[StageRow]
|
|
summarize_eta: SummarizeEta
|
|
trend_24h: list[TrendBucket]
|
|
totals: Totals
|
|
background_jobs: list[BackgroundJobItem] = []
|
|
|
|
|
|
class FailedItem(BaseModel):
|
|
"""영구 실패 행 — 실패 드로어 표시 단위."""
|
|
id: int
|
|
stage: str
|
|
document_id: int
|
|
title: str
|
|
attempts: int
|
|
max_attempts: int
|
|
error_message: str | None
|
|
failed_at: datetime | None
|
|
|
|
|
|
class FailedListResponse(BaseModel):
|
|
items: list[FailedItem]
|
|
total: int
|
|
|
|
|
|
class QueueActionRequest(BaseModel):
|
|
"""재시도/건너뛰기 대상 — 실패 행 id 목록 (FE 가 그룹핑 후 전달)."""
|
|
ids: list[int] = Field(min_length=1, max_length=300)
|
|
|
|
|
|
class RetryResponse(BaseModel):
|
|
requested: int
|
|
retried: int
|
|
not_retried: int
|
|
|
|
|
|
class SkipResponse(BaseModel):
|
|
requested: int
|
|
skipped: int
|
|
not_skipped: int
|
|
|
|
|
|
@router.get("/overview", response_model=QueueOverviewResponse)
|
|
async def get_queue_overview(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""머신 관점 처리 보드 + summarize ETA 집계 (라이브 계산, 신규 테이블 0)"""
|
|
return QueueOverviewResponse.model_validate(await build_overview(session))
|
|
|
|
|
|
@router.get("/failed", response_model=FailedListResponse)
|
|
async def get_failed_items(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)"""
|
|
items = await fetch_failed_items(session)
|
|
return FailedListResponse(
|
|
items=[FailedItem.model_validate(i) for i in items],
|
|
total=len(items),
|
|
)
|
|
|
|
|
|
@router.post("/retry", response_model=RetryResponse)
|
|
async def retry_failed_items(
|
|
body: QueueActionRequest,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""실패 행 재시도 — attempts 리셋 + pending 복귀.
|
|
|
|
not_retried = 같은 (문서, 단계) 의 active 행 충돌(uq_queue_active) 또는
|
|
이미 failed 가 아닌 행 (중복 클릭 등) — 건드리지 않고 건수만 보고.
|
|
"""
|
|
return RetryResponse.model_validate(await retry_failed(session, body.ids))
|
|
|
|
|
|
@router.post("/skip", response_model=SkipResponse)
|
|
async def skip_failed_items(
|
|
body: QueueActionRequest,
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""실패 행 건너뛰기 — completed 마킹(payload.skipped_by_user) + 연쇄 없음"""
|
|
return SkipResponse.model_validate(await skip_failed(session, body.ids))
|