feat(board): 처리 보드 v2 — 파이프라인 흐름 뷰·엔진 구분·실패 재시도/건너뛰기 (ds-board-engines-1)

- 흐름 뷰 메인: 좌→우 노드(머신·엔진 태그, 유입 우세 amber, 실패 뱃지) + 머신 스트립(모델 표기) + trend_24h 스파크라인 첫 렌더
- 노드 클릭 상세 패널: KV 4칸 + 다중 stage 행 + 지금 처리 중
- 실패 처리 드로어: 에러 패턴 그룹 + 재시도/건너뛰기 (영구 실패의 첫 사용자 조치 경로)
- API: stages[].done_1h/created_1h 노출 + GET /api/queue/failed + POST /api/queue/retry|/skip (uq_queue_active 충돌 skip, 건너뛰기는 enqueue_next_stage 미호출)
- 엔진/모델 표기 = queueDisplay.ts 정적 맵 단일 지점 (모델 교체 시 1곳)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-12 01:05:04 +00:00
parent 8ac1dbf4a8
commit 5581d3f1ce
7 changed files with 744 additions and 71 deletions
+104 -1
View File
@@ -22,7 +22,7 @@ from datetime import datetime, timedelta
from posixpath import basename
from zoneinfo import ZoneInfo
from sqlalchemy import text
from sqlalchemy import bindparam, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
@@ -258,6 +258,8 @@ def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]:
"pending": st["pending"],
"processing": st["processing"],
"failed": st["failed"],
"done_1h": st["done_1h"],
"created_1h": st["created_1h"],
"done_today": st["done_today"],
"oldest_pending_age_sec": age,
})
@@ -408,3 +410,104 @@ async def build_overview(session: AsyncSession) -> dict:
deep_enabled=deep_enabled,
now_kst=now_kst,
)
# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은
# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자).
# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록)
# started_at 을 시각 fallback 으로 쓴다.
_FAILED_LIST_SQL = """
SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts,
q.error_message,
COALESCE(q.completed_at, q.started_at) AS failed_at,
d.title, d.original_filename, d.file_path
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.status = 'failed'
ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST
LIMIT 300
"""
# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여).
# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해.
# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 —
# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고.
_RETRY_SQL = """
UPDATE processing_queue q
SET status = 'pending', attempts = 0,
started_at = NULL, completed_at = NULL
WHERE q.id IN :ids
AND q.status = 'failed'
AND NOT EXISTS (
SELECT 1 FROM processing_queue p
WHERE p.document_id = q.document_id
AND p.stage = q.stage
AND p.status IN ('pending', 'processing')
AND p.id <> q.id
)
RETURNING q.id
"""
# 건너뛰기: failed → completed + payload 마킹 (감사 추적).
# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가
# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로.
_SKIP_SQL = """
UPDATE processing_queue
SET status = 'completed', completed_at = NOW(),
payload = COALESCE(payload, '{}'::jsonb)
|| jsonb_build_object('skipped_by_user', true,
'skipped_at', NOW()::text)
WHERE id IN :ids AND status = 'failed'
RETURNING id
"""
async def fetch_failed_items(session: AsyncSession) -> list[dict]:
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)."""
rows = (await session.execute(text(_FAILED_LIST_SQL))).all()
return [
{
"id": r[0],
"stage": r[1],
"document_id": r[2],
"attempts": int(r[3] or 0),
"max_attempts": int(r[4] or 0),
"error_message": r[5],
"failed_at": r[6],
"title": display_title({
"document_id": r[2],
"title": r[7],
"original_filename": r[8],
"file_path": r[9],
}),
}
for r in rows
]
async def retry_failed(session: AsyncSession, ids: list[int]) -> dict:
"""failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님."""
unique_ids = list(set(ids))
stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True))
retried = (await session.execute(stmt, {"ids": unique_ids})).all()
await session.commit()
return {
"requested": len(unique_ids),
"retried": len(retried),
"not_retried": len(unique_ids) - len(retried),
}
async def skip_failed(session: AsyncSession, ids: list[int]) -> dict:
"""failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음."""
unique_ids = list(set(ids))
stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True))
skipped = (await session.execute(stmt, {"ids": unique_ids})).all()
await session.commit()
return {
"requested": len(unique_ids),
"skipped": len(skipped),
"not_skipped": len(unique_ids) - len(skipped),
}