b630c31077
요약 풀의 머신별 완료 실적(맥미니 vs 맥북)을 /api/queue/overview 응답에 summarize_by_machine 로 노출. rows_to_summarize_split 이 이미 계산하던 값의 additive 투영 — 신규 수집 SQL/마이그 0. 통합 보드 레인의 오프로드 가시화 (맥북이 요약 86% 처리) 재료. + FE 타입 동기 + store 신선도 timestamp(B-4). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
525 lines
21 KiB
Python
525 lines
21 KiB
Python
"""처리 머신 보드 + ETA 집계 (plan ds-processing-ui-6an, 안2+안5/6).
|
||
|
||
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
|
||
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
|
||
|
||
구조: SQL 수집부(build_overview 내부 5쿼리)와 판정부(순수 함수)를 분리.
|
||
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
|
||
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
|
||
|
||
귀속 규칙 (단일 진실):
|
||
- stage→machine 정적 맵: gpu = extract/embed/chunk/markdown/preview/thumbnail/
|
||
fulltext/stt · macmini = classify/summarize · macbook = deep_summary
|
||
(단, settings.ai.deep 부재 시 deep_summary 도 macmini 귀속).
|
||
- summarize 는 풀(pool): pending/processing/failed 는 macmini 귀속이되, 완료
|
||
실적(done_*)은 documents.ai_model_version 조인으로 분리 — 'qwen-macbook'
|
||
이면 macbook 실적, 아니면 macmini 실적.
|
||
- deferred_pending(payload.deferred_until 미래)은 macbook 카드 귀속
|
||
(보류 = 맥북 불가 신호).
|
||
"""
|
||
|
||
from datetime import datetime, timedelta
|
||
from posixpath import basename
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from sqlalchemy import bindparam, text
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from core.config import settings
|
||
|
||
KST = ZoneInfo("Asia/Seoul")
|
||
|
||
# 내부 판별용 alias — 응답에 raw 모델명 노출 금지, 머신 label 만 노출.
|
||
_MACBOOK_MODEL_ALIAS = "qwen-macbook"
|
||
|
||
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
|
||
_GPU_STAGES = (
|
||
"extract", "embed", "chunk", "markdown",
|
||
"preview", "thumbnail", "fulltext", "stt",
|
||
)
|
||
_MACMINI_STAGES = ("classify", "summarize")
|
||
_MACBOOK_STAGES = ("deep_summary",)
|
||
_STAGE_ORDER = _GPU_STAGES + _MACMINI_STAGES + _MACBOOK_STAGES
|
||
|
||
_MACHINE_KEYS = ("gpu", "macmini", "macbook")
|
||
_MACHINE_LABELS = {
|
||
"gpu": "GPU 서버",
|
||
"macmini": "맥미니",
|
||
"macbook": "맥북 M5 Max",
|
||
}
|
||
|
||
# 머신 카드당 current 표시 상한
|
||
_CURRENT_LIMIT = 2
|
||
|
||
|
||
def stage_machine_map(deep_enabled: bool) -> dict[str, str]:
|
||
"""stage → machine key 맵. deep 슬롯 부재 시 deep_summary 는 macmini 귀속."""
|
||
mapping: dict[str, str] = {}
|
||
for s in _GPU_STAGES:
|
||
mapping[s] = "gpu"
|
||
for s in _MACMINI_STAGES:
|
||
mapping[s] = "macmini"
|
||
for s in _MACBOOK_STAGES:
|
||
mapping[s] = "macbook" if deep_enabled else "macmini"
|
||
return mapping
|
||
|
||
|
||
def _zero_stage() -> dict:
|
||
return {
|
||
"pending": 0, "processing": 0, "failed": 0,
|
||
"done_1h": 0, "done_today": 0, "done_15m": 0,
|
||
"deferred_pending": 0, "created_1h": 0, "oldest_pending_at": None,
|
||
}
|
||
|
||
|
||
def rows_to_stage_stats(rows) -> dict[str, dict]:
|
||
"""stage×status 집계 쿼리 행 → {stage: {pending, ..., created_1h}} 변환."""
|
||
stats: dict[str, dict] = {}
|
||
for row in rows:
|
||
stats[row[0]] = {
|
||
"pending": int(row[1] or 0),
|
||
"processing": int(row[2] or 0),
|
||
"failed": int(row[3] or 0),
|
||
"done_1h": int(row[4] or 0),
|
||
"done_today": int(row[5] or 0),
|
||
"done_15m": int(row[6] or 0),
|
||
"deferred_pending": int(row[7] or 0),
|
||
"created_1h": int(row[8] or 0),
|
||
"oldest_pending_at": row[9] if len(row) > 9 else None,
|
||
}
|
||
return stats
|
||
|
||
|
||
def rows_to_summarize_split(rows) -> dict[str, dict]:
|
||
"""summarize 완료 실적 분리 쿼리 행 → {"macbook"|"macmini": {done_*}}.
|
||
|
||
is_macbook = documents.ai_model_version 이 'qwen-macbook' 인지 (내부 판별 전용).
|
||
"""
|
||
split = {
|
||
"macbook": {"done_1h": 0, "done_today": 0, "done_15m": 0},
|
||
"macmini": {"done_1h": 0, "done_today": 0, "done_15m": 0},
|
||
}
|
||
for row in rows:
|
||
key = "macbook" if row[0] else "macmini"
|
||
split[key]["done_1h"] += int(row[1] or 0)
|
||
split[key]["done_today"] += int(row[2] or 0)
|
||
split[key]["done_15m"] += int(row[3] or 0)
|
||
return split
|
||
|
||
|
||
def display_title(row: dict) -> str:
|
||
"""표시용 제목 — title > original_filename > file_path basename > 문서 id."""
|
||
if row.get("title"):
|
||
return row["title"]
|
||
if row.get("original_filename"):
|
||
return row["original_filename"]
|
||
if row.get("file_path"):
|
||
return basename(row["file_path"].rstrip("/"))
|
||
return f"문서 #{row['document_id']}"
|
||
|
||
|
||
def build_machines(
|
||
stage_stats: dict[str, dict],
|
||
summarize_split: dict[str, dict],
|
||
current_rows: list[dict],
|
||
*,
|
||
deep_enabled: bool,
|
||
) -> list[dict]:
|
||
"""머신 카드 3장 (gpu / macmini / macbook) 구성 — 귀속 규칙의 판정부."""
|
||
smap = stage_machine_map(deep_enabled)
|
||
|
||
def g(stage: str, field: str) -> int:
|
||
return stage_stats.get(stage, {}).get(field, 0)
|
||
|
||
# current 귀속: processing 행을 머신별 최대 2건 (summarize processing → macmini)
|
||
current_by_machine: dict[str, list[dict]] = {k: [] for k in _MACHINE_KEYS}
|
||
for row in current_rows:
|
||
machine = smap.get(row["stage"])
|
||
if machine and len(current_by_machine[machine]) < _CURRENT_LIMIT:
|
||
current_by_machine[machine].append({
|
||
"document_id": row["document_id"],
|
||
"title": display_title(row),
|
||
"stage": row["stage"],
|
||
})
|
||
|
||
machines = []
|
||
for key in _MACHINE_KEYS:
|
||
stages = [s for s in _STAGE_ORDER if smap[s] == key]
|
||
|
||
pending = sum(g(s, "pending") for s in stages)
|
||
processing = sum(g(s, "processing") for s in stages)
|
||
failed = sum(g(s, "failed") for s in stages)
|
||
|
||
# 완료 실적: summarize 는 풀이라 stage 합산에서 제외하고 split 로 귀속
|
||
done_1h = sum(g(s, "done_1h") for s in stages if s != "summarize")
|
||
done_today = sum(g(s, "done_today") for s in stages if s != "summarize")
|
||
done_15m = sum(g(s, "done_15m") for s in stages if s != "summarize")
|
||
if key in summarize_split:
|
||
done_1h += summarize_split[key]["done_1h"]
|
||
done_today += summarize_split[key]["done_today"]
|
||
done_15m += summarize_split[key]["done_15m"]
|
||
|
||
# 보류 백오프 = 맥북 불가 신호 → macbook 카드 귀속 (deep 슬롯 유무 무관)
|
||
deferred_pending = (
|
||
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
|
||
if key == "macbook" else 0
|
||
)
|
||
|
||
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
|
||
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
|
||
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
|
||
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(sleep/불가 지속)에서만.
|
||
if processing > 0 or done_15m > 0:
|
||
state = "active"
|
||
elif key == "macbook" and deferred_pending > 0:
|
||
state = "deferred"
|
||
else:
|
||
state = "idle"
|
||
|
||
machines.append({
|
||
"key": key,
|
||
"label": _MACHINE_LABELS[key],
|
||
"state": state,
|
||
"stages": stages,
|
||
"pending": pending,
|
||
"processing": processing,
|
||
"failed": failed,
|
||
"done_1h": done_1h,
|
||
"done_today": done_today,
|
||
"deferred_pending": deferred_pending,
|
||
"current": current_by_machine[key],
|
||
})
|
||
return machines
|
||
|
||
|
||
def compute_eta_minutes(pending: int, done_1h: int, inflow_1h: int) -> int | None:
|
||
"""ETA(분) = 순소화율 기반. done > inflow 일 때만 산출, 아니면 None (소화 불가)."""
|
||
if done_1h > inflow_1h:
|
||
return round(pending / (done_1h - inflow_1h) * 60)
|
||
return None
|
||
|
||
|
||
def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
|
||
"""summarize 풀 ETA — pending 은 보류(deferred) 포함 총수."""
|
||
s = stage_stats.get("summarize", _zero_stage())
|
||
pending = s["pending"]
|
||
done_rate = s["done_1h"]
|
||
inflow_rate = s["created_1h"]
|
||
return {
|
||
"pending": pending,
|
||
"done_rate_1h": done_rate,
|
||
"inflow_rate_1h": inflow_rate,
|
||
"eta_minutes": compute_eta_minutes(pending, done_rate, inflow_rate),
|
||
}
|
||
|
||
|
||
def build_summarize_by_machine(summarize_split: dict[str, dict]) -> dict:
|
||
"""summarize 머신별 완료 실적 분담 (macmini vs macbook) — 보드 레인의
|
||
오프로드 가시화용. rows_to_summarize_split 이 이미 만든 값을 응답 형태로
|
||
투영(done_1h/done_today 만, done_15m 은 내부 state 판정 전용이라 제외)."""
|
||
def m(key: str) -> dict:
|
||
s = summarize_split.get(key, {})
|
||
return {"done_1h": int(s.get("done_1h", 0)), "done_today": int(s.get("done_today", 0))}
|
||
return {"macmini": m("macmini"), "macbook": m("macbook")}
|
||
|
||
|
||
def build_trend(
|
||
inflow_buckets: dict[str, int],
|
||
done_buckets: dict[str, int],
|
||
now_kst: datetime,
|
||
) -> list[dict]:
|
||
"""summarize 24h 추이 — KST 시간 버킷 24개 (오래된 것부터, 빈 버킷 0).
|
||
|
||
버킷 key = "YYYY-MM-DD HH:00" (KST). SQL to_char 출력과 동일 포맷.
|
||
"""
|
||
base = now_kst.replace(minute=0, second=0, microsecond=0)
|
||
trend = []
|
||
for i in range(23, -1, -1):
|
||
bucket = base - timedelta(hours=i)
|
||
key = bucket.strftime("%Y-%m-%d %H:00")
|
||
trend.append({
|
||
"hour": bucket.strftime("%H:00"),
|
||
"inflow": inflow_buckets.get(key, 0),
|
||
"done": done_buckets.get(key, 0),
|
||
})
|
||
return trend
|
||
|
||
|
||
def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]:
|
||
"""단계별 현황 행 — '단계 상세' 패널용 (2026-06-11 사용자 피드백: 완료가 보여야 한다).
|
||
|
||
파이프라인 순서 유지, 미지 stage 는 뒤에. 숨김/강조 판단은 FE 몫 — 여기선 사실만.
|
||
oldest_pending_age_sec = 가장 오래된 pending 의 경과 초 (pending 없으면 None).
|
||
"""
|
||
from datetime import datetime, timezone
|
||
now = now or datetime.now(timezone.utc)
|
||
extra = [s for s in stage_stats if s not in _STAGE_ORDER]
|
||
rows = []
|
||
for stage in [*_STAGE_ORDER, *extra]:
|
||
st = stage_stats.get(stage) or _zero_stage()
|
||
oldest = st.get("oldest_pending_at")
|
||
age = None
|
||
if oldest is not None:
|
||
if oldest.tzinfo is None:
|
||
oldest = oldest.replace(tzinfo=timezone.utc)
|
||
age = max(0, int((now - oldest).total_seconds()))
|
||
rows.append({
|
||
"stage": stage,
|
||
"pending": st["pending"],
|
||
"processing": st["processing"],
|
||
"failed": st["failed"],
|
||
"done_1h": st["done_1h"],
|
||
"created_1h": st["created_1h"],
|
||
"done_today": st["done_today"],
|
||
"oldest_pending_age_sec": age,
|
||
})
|
||
return rows
|
||
|
||
|
||
def build_totals(stage_stats: dict[str, dict]) -> dict:
|
||
"""전 stage 합계."""
|
||
return {
|
||
"pending": sum(s["pending"] for s in stage_stats.values()),
|
||
"processing": sum(s["processing"] for s in stage_stats.values()),
|
||
"failed": sum(s["failed"] for s in stage_stats.values()),
|
||
}
|
||
|
||
|
||
def compose_overview(
|
||
stage_stats: dict[str, dict],
|
||
summarize_split: dict[str, dict],
|
||
inflow_buckets: dict[str, int],
|
||
done_buckets: dict[str, int],
|
||
current_rows: list[dict],
|
||
*,
|
||
deep_enabled: bool,
|
||
now_kst: datetime,
|
||
) -> dict:
|
||
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
|
||
return {
|
||
"machines": build_machines(
|
||
stage_stats, summarize_split, current_rows, deep_enabled=deep_enabled
|
||
),
|
||
"stages": build_stages(stage_stats),
|
||
"summarize_eta": build_summarize_eta(stage_stats),
|
||
"summarize_by_machine": build_summarize_by_machine(summarize_split),
|
||
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
|
||
"totals": build_totals(stage_stats),
|
||
}
|
||
|
||
|
||
# ─── SQL 수집부 (총 5쿼리) ────────────────────────────────────────────────────
|
||
|
||
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
|
||
_STAGE_STATS_SQL = """
|
||
SELECT
|
||
stage,
|
||
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
|
||
COUNT(*) FILTER (WHERE status = 'processing') AS processing,
|
||
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
|
||
COUNT(*) FILTER (WHERE status = 'completed'
|
||
AND completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
|
||
COUNT(*) FILTER (WHERE status = 'completed'
|
||
AND completed_at > :kst_midnight) AS done_today,
|
||
COUNT(*) FILTER (WHERE status = 'completed'
|
||
AND completed_at > NOW() - INTERVAL '15 minutes') AS done_15m,
|
||
COUNT(*) FILTER (WHERE status = 'pending'
|
||
AND payload ->> 'deferred_until' IS NOT NULL
|
||
AND (payload ->> 'deferred_until')::timestamptz > NOW())
|
||
AS deferred_pending,
|
||
COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '1 hour') AS created_1h,
|
||
MIN(created_at) FILTER (WHERE status = 'pending') AS oldest_pending_at
|
||
FROM processing_queue
|
||
GROUP BY stage
|
||
"""
|
||
|
||
# 2) summarize 풀 완료 실적 분리 (documents.ai_model_version 조인, 1방)
|
||
# 스캔 하한 = 오늘 0시(KST)와 1h 전 중 더 이른 시각 (자정 직후 1h 창 보전).
|
||
_SUMMARIZE_SPLIT_SQL = """
|
||
SELECT
|
||
COALESCE(d.ai_model_version = :macbook_alias, false) AS is_macbook,
|
||
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
|
||
COUNT(*) FILTER (WHERE q.completed_at > :kst_midnight) AS done_today,
|
||
COUNT(*) FILTER (WHERE q.completed_at > NOW() - INTERVAL '15 minutes') AS done_15m
|
||
FROM processing_queue q
|
||
JOIN documents d ON d.id = q.document_id
|
||
WHERE q.stage = 'summarize'
|
||
AND q.status = 'completed'
|
||
AND q.completed_at > LEAST(:kst_midnight, NOW() - INTERVAL '1 hour')
|
||
GROUP BY 1
|
||
"""
|
||
|
||
# 3/4) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
|
||
_TREND_INFLOW_SQL = """
|
||
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
|
||
'YYYY-MM-DD HH24:00') AS bucket,
|
||
COUNT(*) AS n
|
||
FROM processing_queue
|
||
WHERE stage = 'summarize'
|
||
AND created_at > NOW() - INTERVAL '24 hours'
|
||
GROUP BY 1
|
||
"""
|
||
|
||
_TREND_DONE_SQL = """
|
||
SELECT to_char(date_trunc('hour', completed_at AT TIME ZONE 'Asia/Seoul'),
|
||
'YYYY-MM-DD HH24:00') AS bucket,
|
||
COUNT(*) AS n
|
||
FROM processing_queue
|
||
WHERE stage = 'summarize'
|
||
AND status = 'completed'
|
||
AND completed_at > NOW() - INTERVAL '24 hours'
|
||
GROUP BY 1
|
||
"""
|
||
|
||
# 5) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
|
||
_CURRENT_SQL = """
|
||
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
|
||
FROM processing_queue q
|
||
JOIN documents d ON d.id = q.document_id
|
||
WHERE q.status = 'processing'
|
||
ORDER BY q.started_at DESC NULLS LAST
|
||
LIMIT 50
|
||
"""
|
||
|
||
|
||
async def build_overview(session: AsyncSession) -> dict:
|
||
"""5쿼리 수집 → compose_overview 판정 → 응답 dict."""
|
||
now_kst = datetime.now(KST)
|
||
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
deep_enabled = settings.ai is not None and settings.ai.deep is not None
|
||
|
||
stage_rows = (
|
||
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
|
||
).all()
|
||
split_rows = (
|
||
await session.execute(
|
||
text(_SUMMARIZE_SPLIT_SQL),
|
||
{"kst_midnight": kst_midnight, "macbook_alias": _MACBOOK_MODEL_ALIAS},
|
||
)
|
||
).all()
|
||
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
|
||
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
|
||
current_result = (await session.execute(text(_CURRENT_SQL))).all()
|
||
|
||
current_rows = [
|
||
{
|
||
"stage": row[0],
|
||
"document_id": row[1],
|
||
"title": row[2],
|
||
"original_filename": row[3],
|
||
"file_path": row[4],
|
||
}
|
||
for row in current_result
|
||
]
|
||
|
||
return compose_overview(
|
||
rows_to_stage_stats(stage_rows),
|
||
rows_to_summarize_split(split_rows),
|
||
{row[0]: int(row[1]) for row in inflow_rows},
|
||
{row[0]: int(row[1]) for row in done_rows},
|
||
current_rows,
|
||
deep_enabled=deep_enabled,
|
||
now_kst=now_kst,
|
||
)
|
||
|
||
|
||
# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
|
||
# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은
|
||
# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자).
|
||
|
||
# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록)
|
||
# started_at 을 시각 fallback 으로 쓴다.
|
||
_FAILED_LIST_SQL = """
|
||
SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts,
|
||
q.error_message,
|
||
COALESCE(q.completed_at, q.started_at) AS failed_at,
|
||
d.title, d.original_filename, d.file_path
|
||
FROM processing_queue q
|
||
JOIN documents d ON d.id = q.document_id
|
||
WHERE q.status = 'failed'
|
||
ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST
|
||
LIMIT 300
|
||
"""
|
||
|
||
# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여).
|
||
# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해.
|
||
# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 —
|
||
# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고.
|
||
_RETRY_SQL = """
|
||
UPDATE processing_queue q
|
||
SET status = 'pending', attempts = 0,
|
||
started_at = NULL, completed_at = NULL
|
||
WHERE q.id IN :ids
|
||
AND q.status = 'failed'
|
||
AND NOT EXISTS (
|
||
SELECT 1 FROM processing_queue p
|
||
WHERE p.document_id = q.document_id
|
||
AND p.stage = q.stage
|
||
AND p.status IN ('pending', 'processing')
|
||
AND p.id <> q.id
|
||
)
|
||
RETURNING q.id
|
||
"""
|
||
|
||
# 건너뛰기: failed → completed + payload 마킹 (감사 추적).
|
||
# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가
|
||
# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로.
|
||
_SKIP_SQL = """
|
||
UPDATE processing_queue
|
||
SET status = 'completed', completed_at = NOW(),
|
||
payload = COALESCE(payload, '{}'::jsonb)
|
||
|| jsonb_build_object('skipped_by_user', true,
|
||
'skipped_at', NOW()::text)
|
||
WHERE id IN :ids AND status = 'failed'
|
||
RETURNING id
|
||
"""
|
||
|
||
|
||
async def fetch_failed_items(session: AsyncSession) -> list[dict]:
|
||
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)."""
|
||
rows = (await session.execute(text(_FAILED_LIST_SQL))).all()
|
||
return [
|
||
{
|
||
"id": r[0],
|
||
"stage": r[1],
|
||
"document_id": r[2],
|
||
"attempts": int(r[3] or 0),
|
||
"max_attempts": int(r[4] or 0),
|
||
"error_message": r[5],
|
||
"failed_at": r[6],
|
||
"title": display_title({
|
||
"document_id": r[2],
|
||
"title": r[7],
|
||
"original_filename": r[8],
|
||
"file_path": r[9],
|
||
}),
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
async def retry_failed(session: AsyncSession, ids: list[int]) -> dict:
|
||
"""failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님."""
|
||
unique_ids = list(set(ids))
|
||
stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True))
|
||
retried = (await session.execute(stmt, {"ids": unique_ids})).all()
|
||
await session.commit()
|
||
return {
|
||
"requested": len(unique_ids),
|
||
"retried": len(retried),
|
||
"not_retried": len(unique_ids) - len(retried),
|
||
}
|
||
|
||
|
||
async def skip_failed(session: AsyncSession, ids: list[int]) -> dict:
|
||
"""failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음."""
|
||
unique_ids = list(set(ids))
|
||
stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True))
|
||
skipped = (await session.execute(stmt, {"ids": unique_ids})).all()
|
||
await session.commit()
|
||
return {
|
||
"requested": len(unique_ids),
|
||
"skipped": len(skipped),
|
||
"not_skipped": len(unique_ids) - len(skipped),
|
||
}
|