b91b05e889
2026-07-02 컷오버 반영 — GPU 서버 퇴역, 맥북 night-drain 보류(06-29 결정). - 레인 2개: 나스(추출/마크다운/청크·임베딩 등 DS 본체 Docker 스테이지), 맥미니(분류/요약/심층분석 — 단일 생성 LLM 허브 + bge-m3/리랭크) - summarize 풀 분리(summarize_by_machine·ai_model_version 조인 SQL) 제거 — FE 유일 소비자 확인 후 응답 스키마에서 정리 (5쿼리 -> 4쿼리) - 맥북 전제 UI 제거: 요약 오프로드 분담막대·요약 합류 칩·번다운 합류 변곡점 마커·잠듦 문구·전역 스트립 맥북 칩(맥미니 칩으로 대체) - deferred_pending = LLM 백오프 신호로 맥미니 카드 귀속 (기능 보존) - 번다운 차트·정직 ETA·실패 드로어·백그라운드 작업 등 머신 무관 기능 보존 - background_jobs 머신 귀속 기본값 gpu -> nas - 단위테스트 2노드 기준 재작성 (27 passed) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
496 lines
19 KiB
Python
496 lines
19 KiB
Python
"""처리 머신 보드 + ETA 집계 (plan ds-processing-ui-6an, 안2+안5/6).
|
||
|
||
GET /api/queue/overview 의 집계 로직. 모든 수치는 기존 processing_queue /
|
||
documents 컬럼에서 라이브 계산 — 신규 테이블/마이그레이션 0 (HARD 제약).
|
||
|
||
구조: SQL 수집부(build_overview 내부 4쿼리)와 판정부(순수 함수)를 분리.
|
||
판정부(rows_to_* / build_machines / build_summarize_eta / build_trend /
|
||
build_totals / compute_eta_minutes)는 DB 없이 단위테스트 가능.
|
||
|
||
귀속 규칙 (단일 진실 — 2026-07-02 컷오버 후 나스+맥미니 2노드):
|
||
- stage→machine 정적 맵: nas = extract/embed/chunk/markdown/preview/thumbnail/
|
||
fulltext/stt (DS 본체 Docker — 임베딩·리랭크 모델 콜은 맥미니로 나감) ·
|
||
macmini = classify/summarize/deep_summary (단일 생성 LLM 허브).
|
||
- deferred_pending(payload.deferred_until 미래)은 LLM 백오프 신호 —
|
||
summarize/deep_summary 소속인 macmini 카드 귀속.
|
||
"""
|
||
|
||
from datetime import datetime, timedelta
|
||
from posixpath import basename
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from sqlalchemy import bindparam, text
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
KST = ZoneInfo("Asia/Seoul")
|
||
|
||
# stage→machine 정적 맵 재료 (선언 순서 = 카드 stages 표시 순서)
|
||
_NAS_STAGES = (
|
||
"extract", "embed", "chunk", "markdown",
|
||
"preview", "thumbnail", "fulltext", "stt",
|
||
)
|
||
_MACMINI_STAGES = ("classify", "summarize", "deep_summary")
|
||
_STAGE_ORDER = _NAS_STAGES + _MACMINI_STAGES
|
||
|
||
_MACHINE_KEYS = ("nas", "macmini")
|
||
_MACHINE_LABELS = {
|
||
"nas": "나스",
|
||
"macmini": "맥미니",
|
||
}
|
||
|
||
# 머신 카드당 current 표시 상한
|
||
_CURRENT_LIMIT = 2
|
||
|
||
|
||
def stage_machine_map() -> dict[str, str]:
|
||
"""stage → machine key 맵 (정적 — 나스/맥미니 2노드)."""
|
||
mapping: dict[str, str] = {}
|
||
for s in _NAS_STAGES:
|
||
mapping[s] = "nas"
|
||
for s in _MACMINI_STAGES:
|
||
mapping[s] = "macmini"
|
||
return mapping
|
||
|
||
|
||
def _zero_stage() -> dict:
|
||
return {
|
||
"pending": 0, "processing": 0, "failed": 0,
|
||
"done_1h": 0, "done_today": 0, "done_15m": 0,
|
||
"deferred_pending": 0, "created_1h": 0, "oldest_pending_at": None,
|
||
}
|
||
|
||
|
||
def rows_to_stage_stats(rows) -> dict[str, dict]:
|
||
"""stage×status 집계 쿼리 행 → {stage: {pending, ..., created_1h}} 변환."""
|
||
stats: dict[str, dict] = {}
|
||
for row in rows:
|
||
stats[row[0]] = {
|
||
"pending": int(row[1] or 0),
|
||
"processing": int(row[2] or 0),
|
||
"failed": int(row[3] or 0),
|
||
"done_1h": int(row[4] or 0),
|
||
"done_today": int(row[5] or 0),
|
||
"done_15m": int(row[6] or 0),
|
||
"deferred_pending": int(row[7] or 0),
|
||
"created_1h": int(row[8] or 0),
|
||
"oldest_pending_at": row[9] if len(row) > 9 else None,
|
||
}
|
||
return stats
|
||
|
||
|
||
def display_title(row: dict) -> str:
|
||
"""표시용 제목 — title > original_filename > file_path basename > 문서 id."""
|
||
if row.get("title"):
|
||
return row["title"]
|
||
if row.get("original_filename"):
|
||
return row["original_filename"]
|
||
if row.get("file_path"):
|
||
return basename(row["file_path"].rstrip("/"))
|
||
return f"문서 #{row['document_id']}"
|
||
|
||
|
||
def build_machines(
|
||
stage_stats: dict[str, dict],
|
||
current_rows: list[dict],
|
||
) -> list[dict]:
|
||
"""머신 카드 2장 (nas / macmini) 구성 — 귀속 규칙의 판정부."""
|
||
smap = stage_machine_map()
|
||
|
||
def g(stage: str, field: str) -> int:
|
||
return stage_stats.get(stage, {}).get(field, 0)
|
||
|
||
# current 귀속: processing 행을 머신별 최대 2건 (summarize processing → macmini)
|
||
current_by_machine: dict[str, list[dict]] = {k: [] for k in _MACHINE_KEYS}
|
||
for row in current_rows:
|
||
machine = smap.get(row["stage"])
|
||
if machine and len(current_by_machine[machine]) < _CURRENT_LIMIT:
|
||
current_by_machine[machine].append({
|
||
"document_id": row["document_id"],
|
||
"title": display_title(row),
|
||
"stage": row["stage"],
|
||
})
|
||
|
||
machines = []
|
||
for key in _MACHINE_KEYS:
|
||
stages = [s for s in _STAGE_ORDER if smap[s] == key]
|
||
|
||
pending = sum(g(s, "pending") for s in stages)
|
||
processing = sum(g(s, "processing") for s in stages)
|
||
failed = sum(g(s, "failed") for s in stages)
|
||
done_1h = sum(g(s, "done_1h") for s in stages)
|
||
done_today = sum(g(s, "done_today") for s in stages)
|
||
done_15m = sum(g(s, "done_15m") for s in stages)
|
||
|
||
# 보류 백오프 = LLM 불가 신호 → LLM stage 소속인 macmini 카드 귀속
|
||
deferred_pending = (
|
||
g("summarize", "deferred_pending") + g("deep_summary", "deferred_pending")
|
||
if key == "macmini" else 0
|
||
)
|
||
|
||
# state 판정 — 우선순위: 가동 > 보류 > 대기 (사용자 피드백 2026-06-11).
|
||
# 일하고 있으면(처리 중 또는 최근 15분 완료) 백오프 잔여가 있어도 "가동" —
|
||
# 보류 건수는 카드의 deferred_pending 라인이 따로 보여준다. "보류" 칩은
|
||
# 실제로 일이 멈춰 있고 백오프만 쌓인 상태(LLM 허브 불가 지속)에서만.
|
||
if processing > 0 or done_15m > 0:
|
||
state = "active"
|
||
elif deferred_pending > 0:
|
||
state = "deferred"
|
||
else:
|
||
state = "idle"
|
||
|
||
machines.append({
|
||
"key": key,
|
||
"label": _MACHINE_LABELS[key],
|
||
"state": state,
|
||
"stages": stages,
|
||
"pending": pending,
|
||
"processing": processing,
|
||
"failed": failed,
|
||
"done_1h": done_1h,
|
||
"done_today": done_today,
|
||
"deferred_pending": deferred_pending,
|
||
"current": current_by_machine[key],
|
||
})
|
||
return machines
|
||
|
||
|
||
def compute_eta_minutes(pending: int, done_1h: int, inflow_1h: int) -> int | None:
|
||
"""ETA(분) = 순소화율 기반. done > inflow 일 때만 산출, 아니면 None (소화 불가)."""
|
||
if done_1h > inflow_1h:
|
||
return round(pending / (done_1h - inflow_1h) * 60)
|
||
return None
|
||
|
||
|
||
def build_summarize_eta(stage_stats: dict[str, dict]) -> dict:
|
||
"""summarize 풀 ETA — pending 은 보류(deferred) 포함 총수."""
|
||
s = stage_stats.get("summarize", _zero_stage())
|
||
pending = s["pending"]
|
||
done_rate = s["done_1h"]
|
||
inflow_rate = s["created_1h"]
|
||
return {
|
||
"pending": pending,
|
||
"done_rate_1h": done_rate,
|
||
"inflow_rate_1h": inflow_rate,
|
||
"eta_minutes": compute_eta_minutes(pending, done_rate, inflow_rate),
|
||
}
|
||
|
||
|
||
def build_trend(
|
||
inflow_buckets: dict[str, int],
|
||
done_buckets: dict[str, int],
|
||
now_kst: datetime,
|
||
) -> list[dict]:
|
||
"""summarize 24h 추이 — KST 시간 버킷 24개 (오래된 것부터, 빈 버킷 0).
|
||
|
||
버킷 key = "YYYY-MM-DD HH:00" (KST). SQL to_char 출력과 동일 포맷.
|
||
"""
|
||
base = now_kst.replace(minute=0, second=0, microsecond=0)
|
||
trend = []
|
||
for i in range(23, -1, -1):
|
||
bucket = base - timedelta(hours=i)
|
||
key = bucket.strftime("%Y-%m-%d %H:00")
|
||
trend.append({
|
||
"hour": bucket.strftime("%H:00"),
|
||
"inflow": inflow_buckets.get(key, 0),
|
||
"done": done_buckets.get(key, 0),
|
||
})
|
||
return trend
|
||
|
||
|
||
def build_stages(stage_stats: dict[str, dict], now=None) -> list[dict]:
|
||
"""단계별 현황 행 — '단계 상세' 패널용 (2026-06-11 사용자 피드백: 완료가 보여야 한다).
|
||
|
||
파이프라인 순서 유지, 미지 stage 는 뒤에. 숨김/강조 판단은 FE 몫 — 여기선 사실만.
|
||
oldest_pending_age_sec = 가장 오래된 pending 의 경과 초 (pending 없으면 None).
|
||
"""
|
||
from datetime import datetime, timezone
|
||
now = now or datetime.now(timezone.utc)
|
||
extra = [s for s in stage_stats if s not in _STAGE_ORDER]
|
||
rows = []
|
||
for stage in [*_STAGE_ORDER, *extra]:
|
||
st = stage_stats.get(stage) or _zero_stage()
|
||
oldest = st.get("oldest_pending_at")
|
||
age = None
|
||
if oldest is not None:
|
||
if oldest.tzinfo is None:
|
||
oldest = oldest.replace(tzinfo=timezone.utc)
|
||
age = max(0, int((now - oldest).total_seconds()))
|
||
rows.append({
|
||
"stage": stage,
|
||
"pending": st["pending"],
|
||
"processing": st["processing"],
|
||
"failed": st["failed"],
|
||
"done_1h": st["done_1h"],
|
||
"created_1h": st["created_1h"],
|
||
"done_today": st["done_today"],
|
||
"oldest_pending_age_sec": age,
|
||
})
|
||
return rows
|
||
|
||
|
||
def build_totals(stage_stats: dict[str, dict]) -> dict:
|
||
"""전 stage 합계."""
|
||
return {
|
||
"pending": sum(s["pending"] for s in stage_stats.values()),
|
||
"processing": sum(s["processing"] for s in stage_stats.values()),
|
||
"failed": sum(s["failed"] for s in stage_stats.values()),
|
||
}
|
||
|
||
|
||
def compose_overview(
|
||
stage_stats: dict[str, dict],
|
||
inflow_buckets: dict[str, int],
|
||
done_buckets: dict[str, int],
|
||
current_rows: list[dict],
|
||
*,
|
||
now_kst: datetime,
|
||
) -> dict:
|
||
"""수집된 통계 → 응답 dict (계약 shape). 순수 함수 — DB 불요."""
|
||
return {
|
||
"machines": build_machines(stage_stats, current_rows),
|
||
"stages": build_stages(stage_stats),
|
||
"summarize_eta": build_summarize_eta(stage_stats),
|
||
"trend_24h": build_trend(inflow_buckets, done_buckets, now_kst),
|
||
"totals": build_totals(stage_stats),
|
||
}
|
||
|
||
|
||
# ─── SQL 수집부 (총 4쿼리) ────────────────────────────────────────────────────
|
||
|
||
# 1) stage×status 집계 + 시간창 완료/유입 + 보류 (1방)
|
||
_STAGE_STATS_SQL = """
|
||
SELECT
|
||
stage,
|
||
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
|
||
COUNT(*) FILTER (WHERE status = 'processing') AS processing,
|
||
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
|
||
COUNT(*) FILTER (WHERE status = 'completed'
|
||
AND completed_at > NOW() - INTERVAL '1 hour') AS done_1h,
|
||
COUNT(*) FILTER (WHERE status = 'completed'
|
||
AND completed_at > :kst_midnight) AS done_today,
|
||
COUNT(*) FILTER (WHERE status = 'completed'
|
||
AND completed_at > NOW() - INTERVAL '15 minutes') AS done_15m,
|
||
COUNT(*) FILTER (WHERE status = 'pending'
|
||
AND payload ->> 'deferred_until' IS NOT NULL
|
||
AND (payload ->> 'deferred_until')::timestamptz > NOW())
|
||
AS deferred_pending,
|
||
COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '1 hour') AS created_1h,
|
||
MIN(created_at) FILTER (WHERE status = 'pending') AS oldest_pending_at
|
||
FROM processing_queue
|
||
GROUP BY stage
|
||
"""
|
||
|
||
# 2/3) summarize 24h 추이 — KST 시간 버킷 (inflow/done 각 1방)
|
||
_TREND_INFLOW_SQL = """
|
||
SELECT to_char(date_trunc('hour', created_at AT TIME ZONE 'Asia/Seoul'),
|
||
'YYYY-MM-DD HH24:00') AS bucket,
|
||
COUNT(*) AS n
|
||
FROM processing_queue
|
||
WHERE stage = 'summarize'
|
||
AND created_at > NOW() - INTERVAL '24 hours'
|
||
GROUP BY 1
|
||
"""
|
||
|
||
_TREND_DONE_SQL = """
|
||
SELECT to_char(date_trunc('hour', completed_at AT TIME ZONE 'Asia/Seoul'),
|
||
'YYYY-MM-DD HH24:00') AS bucket,
|
||
COUNT(*) AS n
|
||
FROM processing_queue
|
||
WHERE stage = 'summarize'
|
||
AND status = 'completed'
|
||
AND completed_at > NOW() - INTERVAL '24 hours'
|
||
GROUP BY 1
|
||
"""
|
||
|
||
# 4) processing 행 + 표시용 제목 재료 (1방 — 머신별 2건 슬라이스는 판정부에서)
|
||
_CURRENT_SQL = """
|
||
SELECT q.stage, q.document_id, d.title, d.original_filename, d.file_path
|
||
FROM processing_queue q
|
||
JOIN documents d ON d.id = q.document_id
|
||
WHERE q.status = 'processing'
|
||
ORDER BY q.started_at DESC NULLS LAST
|
||
LIMIT 50
|
||
"""
|
||
|
||
|
||
async def build_overview(session: AsyncSession) -> dict:
|
||
"""4쿼리 수집 → compose_overview 판정 → 응답 dict."""
|
||
now_kst = datetime.now(KST)
|
||
kst_midnight = now_kst.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
|
||
stage_rows = (
|
||
await session.execute(text(_STAGE_STATS_SQL), {"kst_midnight": kst_midnight})
|
||
).all()
|
||
inflow_rows = (await session.execute(text(_TREND_INFLOW_SQL))).all()
|
||
done_rows = (await session.execute(text(_TREND_DONE_SQL))).all()
|
||
current_result = (await session.execute(text(_CURRENT_SQL))).all()
|
||
|
||
current_rows = [
|
||
{
|
||
"stage": row[0],
|
||
"document_id": row[1],
|
||
"title": row[2],
|
||
"original_filename": row[3],
|
||
"file_path": row[4],
|
||
}
|
||
for row in current_result
|
||
]
|
||
|
||
result = compose_overview(
|
||
rows_to_stage_stats(stage_rows),
|
||
{row[0]: int(row[1]) for row in inflow_rows},
|
||
{row[0]: int(row[1]) for row in done_rows},
|
||
current_rows,
|
||
now_kst=now_kst,
|
||
)
|
||
# 큐 밖 관리 스크립트(백필 등) = background_jobs (migration 357). 테이블 부재 시 graceful([]).
|
||
result["background_jobs"] = await _fetch_background_jobs(session)
|
||
return result
|
||
|
||
|
||
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = nas(오케스트레이션 호스트).
|
||
_BG_JOB_MACHINE = {
|
||
"global_digest": "macmini",
|
||
"morning_briefing": "macmini",
|
||
"section_summary": "macmini",
|
||
"hier_backfill": "nas",
|
||
"hier_redecompose": "nas",
|
||
}
|
||
|
||
|
||
_BACKGROUND_JOBS_SQL = """
|
||
SELECT id, kind, label, state, processed, total,
|
||
EXTRACT(EPOCH FROM (now() - started_at))::int AS elapsed_sec,
|
||
(state = 'running' AND updated_at < now() - interval '5 minutes') AS stale,
|
||
error
|
||
FROM background_jobs
|
||
WHERE state = 'running' OR finished_at > now() - interval '6 hours'
|
||
ORDER BY (state = 'running') DESC, started_at DESC
|
||
LIMIT 20
|
||
"""
|
||
|
||
|
||
async def _fetch_background_jobs(session: AsyncSession) -> list[dict]:
|
||
"""running + 최근 6h 완료 background_jobs. 테이블 없거나 오류면 [] (보드 무영향).
|
||
|
||
요청 세션과 **별도 connection**으로 조회한다 — 테이블 부재(마이그 357 미적용 등) 시
|
||
SELECT 실패가 요청 세션의 트랜잭션을 오염시키지 않도록 물리적으로 분리(실패 시 그
|
||
임시 connection만 폐기). 관측은 부가 기능이라 보드 본체를 절대 깨면 안 된다.
|
||
"""
|
||
try:
|
||
async with session.bind.connect() as conn: # 풀에서 독립 connection
|
||
rows = (await conn.execute(text(_BACKGROUND_JOBS_SQL))).mappings().all()
|
||
except Exception: # noqa: BLE001 — 관측 부가, 보드 본체 보호
|
||
return []
|
||
return [
|
||
{
|
||
"id": r["id"], "kind": r["kind"], "label": r["label"], "state": r["state"],
|
||
"processed": int(r["processed"] or 0), "total": r["total"],
|
||
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
|
||
"error": r["error"],
|
||
"machine": _BG_JOB_MACHINE.get(r["kind"], "nas"),
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
# ─── 실패 처리 (plan ds-board-engines-1) ─────────────────────────────────────
|
||
# 실패 = 자동 재시도(max_attempts=3) 소진 후 영구 정지 상태. 여기 함수들은
|
||
# 사용자 명시 조치 전용 — 자동 호출 경로 없음 (보드 실패 드로어가 유일 호출자).
|
||
|
||
# 실패 행은 completed_at 이 비어 있을 수 있어(소비자 실패 경로가 미기록)
|
||
# started_at 을 시각 fallback 으로 쓴다.
|
||
_FAILED_LIST_SQL = """
|
||
SELECT q.id, q.stage, q.document_id, q.attempts, q.max_attempts,
|
||
q.error_message,
|
||
COALESCE(q.completed_at, q.started_at) AS failed_at,
|
||
d.title, d.original_filename, d.file_path
|
||
FROM processing_queue q
|
||
JOIN documents d ON d.id = q.document_id
|
||
WHERE q.status = 'failed'
|
||
ORDER BY q.stage, COALESCE(q.completed_at, q.started_at) DESC NULLS LAST
|
||
LIMIT 300
|
||
"""
|
||
|
||
# 재시도: failed → pending (attempts 리셋 = 자동 재시도 3회 새로 부여).
|
||
# error_message 는 감사용으로 보존 — 성공 시 완료 행에 남아도 무해.
|
||
# uq_queue_active((doc,stage) pending/processing 부분 유니크)와 충돌하는 행 —
|
||
# 같은 문서·단계가 이미 재enqueue 된 경우 — 는 건드리지 않고 건수만 보고.
|
||
_RETRY_SQL = """
|
||
UPDATE processing_queue q
|
||
SET status = 'pending', attempts = 0,
|
||
started_at = NULL, completed_at = NULL
|
||
WHERE q.id IN :ids
|
||
AND q.status = 'failed'
|
||
AND NOT EXISTS (
|
||
SELECT 1 FROM processing_queue p
|
||
WHERE p.document_id = q.document_id
|
||
AND p.stage = q.stage
|
||
AND p.status IN ('pending', 'processing')
|
||
AND p.id <> q.id
|
||
)
|
||
RETURNING q.id
|
||
"""
|
||
|
||
# 건너뛰기: failed → completed + payload 마킹 (감사 추적).
|
||
# enqueue_next_stage 는 의도적으로 호출하지 않는다 — 실패 문서(빈 텍스트 등)가
|
||
# 하류 단계로 흘러가는 것 방지. 후속 단계가 필요하면 재시도가 정상 경로.
|
||
_SKIP_SQL = """
|
||
UPDATE processing_queue
|
||
SET status = 'completed', completed_at = NOW(),
|
||
payload = COALESCE(payload, '{}'::jsonb)
|
||
|| jsonb_build_object('skipped_by_user', true,
|
||
'skipped_at', NOW()::text)
|
||
WHERE id IN :ids AND status = 'failed'
|
||
RETURNING id
|
||
"""
|
||
|
||
|
||
async def fetch_failed_items(session: AsyncSession) -> list[dict]:
|
||
"""영구 실패 행 목록 (문서 제목 포함, 최대 300건)."""
|
||
rows = (await session.execute(text(_FAILED_LIST_SQL))).all()
|
||
return [
|
||
{
|
||
"id": r[0],
|
||
"stage": r[1],
|
||
"document_id": r[2],
|
||
"attempts": int(r[3] or 0),
|
||
"max_attempts": int(r[4] or 0),
|
||
"error_message": r[5],
|
||
"failed_at": r[6],
|
||
"title": display_title({
|
||
"document_id": r[2],
|
||
"title": r[7],
|
||
"original_filename": r[8],
|
||
"file_path": r[9],
|
||
}),
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
async def retry_failed(session: AsyncSession, ids: list[int]) -> dict:
|
||
"""failed → pending 복귀. not_retried = active 충돌 + 이미 failed 아님."""
|
||
unique_ids = list(set(ids))
|
||
stmt = text(_RETRY_SQL).bindparams(bindparam("ids", expanding=True))
|
||
retried = (await session.execute(stmt, {"ids": unique_ids})).all()
|
||
await session.commit()
|
||
return {
|
||
"requested": len(unique_ids),
|
||
"retried": len(retried),
|
||
"not_retried": len(unique_ids) - len(retried),
|
||
}
|
||
|
||
|
||
async def skip_failed(session: AsyncSession, ids: list[int]) -> dict:
|
||
"""failed → completed(건너뛰기 마킹). 후속 단계 연쇄 없음."""
|
||
unique_ids = list(set(ids))
|
||
stmt = text(_SKIP_SQL).bindparams(bindparam("ids", expanding=True))
|
||
skipped = (await session.execute(stmt, {"ids": unique_ids})).all()
|
||
await session.commit()
|
||
return {
|
||
"requested": len(unique_ids),
|
||
"skipped": len(skipped),
|
||
"not_skipped": len(unique_ids) - len(skipped),
|
||
}
|