c11f113cf1
worker_fn 이 transient 실패를 삼켜 정상 반환하면 queue_consumer 가 status=completed 로 확정 → 영구 데이터 손실 + 재시도/추적 0. 정본(extract/marker/fulltext/stt 는 re-raise)과 어긋난 곳을 통일: - deep_summary: 호출 실패(call_failed)를 삼키지 않고 raise → 재시도→failed dead-letter (이전엔 ai_detail_summary 영구 누락 + tier triage 고착). - thumbnail: _extract_thumbnail 실패를 silent return → raise (썸네일 영구 누락 방지). - queue_consumer: 완료 커밋 후 enqueue_next_stage(정상·skip-note 2곳)를 자체 try 로 격리 — enqueue 실패가 outer except 로 전파돼 completed 항목을 재오픈(stage 재실행) 하던 결함 차단. 실패는 ERROR 로 가시화. - broad except 에 asyncio.CancelledError 명시 통과(embed worker / ask classifier·verifier). dead-letter = ProcessingQueue.status='failed'(기존 attempts/max_attempts 머신 재사용, 신규 컬럼 불필요). 검증: py_compile 통과. 큐 재시도 의미 synthetic smoke(staging) 예정. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
450 lines
22 KiB
Python
450 lines
22 KiB
Python
"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출.
|
||
|
||
PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) stage 를 별 consumer 로 분리.
|
||
대형 PDF split 변환(수십 분) 이 단일 consume_queue 코루틴을 점유해 전 파이프라인을
|
||
stall 시키던 문제 제거. consume_queue = markdown 제외 전 stage / consume_markdown_queue =
|
||
markdown 전용. 두 consumer 의 stage 집합은 disjoint 이라 같은 row 경합/중복 reset 없음.
|
||
"""
|
||
|
||
import os
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
from sqlalchemy import select, update, delete, exists
|
||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||
from sqlalchemy.orm import aliased
|
||
|
||
from core.config import settings
|
||
from core.database import async_session
|
||
from core.utils import setup_logger
|
||
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
|
||
|
||
logger = setup_logger("queue_consumer")
|
||
|
||
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
|
||
_hold_logged = False
|
||
|
||
# stage별 배치 크기
|
||
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
|
||
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
|
||
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
|
||
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
|
||
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
|
||
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
|
||
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
|
||
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
|
||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
|
||
"fulltext": 3}
|
||
STALE_THRESHOLD_MINUTES = 10
|
||
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
|
||
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
|
||
# stale 임계로 보면 살아있는 변환을 stale 로 오인 → pending 복구해 이중 처리한다.
|
||
# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다.
|
||
MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"))
|
||
|
||
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue,
|
||
# embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint
|
||
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
|
||
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
|
||
MAIN_QUEUE_STAGES = [
|
||
"extract", "classify", "summarize",
|
||
"preview", "stt", "thumbnail", "fulltext",
|
||
]
|
||
MARKDOWN_QUEUE_STAGES = ["markdown"]
|
||
|
||
# 2026-06-15: deep_summary(26B, 콜당 70~300s)를 메인 루프에서 분리 (markdown/fast 선례).
|
||
# 단일 deep 호출이 1분 틱을 초과해 메인 consume_queue 가 영구 coalesce 되고 extract/
|
||
# classify 등 경량 stage 까지 굶던 문제 제거. 집합 disjoint(자기 집합만 stale reset).
|
||
DEEP_QUEUE_STAGES = ["deep_summary"]
|
||
|
||
# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비.
|
||
# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다
|
||
# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴.
|
||
FAST_QUEUE_STAGES = ["embed", "chunk"]
|
||
|
||
|
||
async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES):
|
||
"""processing 상태로 오래 방치된 항목 복구 (지정 stage 한정)
|
||
|
||
1) 같은 (document_id, stage)에 pending 행이 이미 있으면
|
||
stale processing 행은 중복이므로 삭제
|
||
2) pending이 없는 stale processing 행만 pending으로 되돌림
|
||
|
||
stages: 이 reset 의 대상 stage 목록. consume_queue 와 consume_markdown_queue 가
|
||
서로 disjoint 한 stage 집합 + 서로 다른 threshold 로 호출한다.
|
||
"""
|
||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=threshold_minutes)
|
||
processing_row = aliased(ProcessingQueue)
|
||
pending_row = aliased(ProcessingQueue)
|
||
|
||
try:
|
||
async with async_session() as session:
|
||
# Step A: pending 중복이 이미 있는 stale processing 삭제
|
||
delete_stmt = (
|
||
delete(ProcessingQueue)
|
||
.where(
|
||
ProcessingQueue.id.in_(
|
||
select(processing_row.id).where(
|
||
processing_row.stage.in_(stages),
|
||
processing_row.status == "processing",
|
||
processing_row.started_at.is_not(None),
|
||
processing_row.started_at < cutoff,
|
||
exists(
|
||
select(1).where(
|
||
pending_row.document_id == processing_row.document_id,
|
||
pending_row.stage == processing_row.stage,
|
||
pending_row.status == "pending",
|
||
)
|
||
),
|
||
)
|
||
)
|
||
)
|
||
)
|
||
delete_result = await session.execute(delete_stmt)
|
||
|
||
# Step B: pending 없는 stale processing만 pending으로 복구
|
||
recoverable_ids = select(processing_row.id).where(
|
||
processing_row.stage.in_(stages),
|
||
processing_row.status == "processing",
|
||
processing_row.started_at.is_not(None),
|
||
processing_row.started_at < cutoff,
|
||
~exists(
|
||
select(1).where(
|
||
pending_row.document_id == processing_row.document_id,
|
||
pending_row.stage == processing_row.stage,
|
||
pending_row.status == "pending",
|
||
)
|
||
),
|
||
)
|
||
update_stmt = (
|
||
update(ProcessingQueue)
|
||
.where(ProcessingQueue.id.in_(recoverable_ids))
|
||
.values(status="pending", started_at=None)
|
||
)
|
||
update_result = await session.execute(update_stmt)
|
||
|
||
await session.commit()
|
||
|
||
deleted = delete_result.rowcount or 0
|
||
recovered = update_result.rowcount or 0
|
||
if deleted > 0:
|
||
logger.warning(
|
||
"deleted %s stale processing rows that already had pending duplicates (stages=%s)",
|
||
deleted, stages,
|
||
)
|
||
if recovered > 0:
|
||
logger.warning(
|
||
"recovered %s stale processing rows back to pending (stages=%s)",
|
||
recovered, stages,
|
||
)
|
||
except IntegrityError:
|
||
logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle")
|
||
except SQLAlchemyError:
|
||
logger.exception("reset_stale_items failed with database error; skipping this cycle")
|
||
except Exception:
|
||
logger.exception("reset_stale_items failed unexpectedly; skipping this cycle")
|
||
|
||
|
||
async def enqueue_next_stage(document_id: int, current_stage: str):
|
||
"""현재 stage 완료 후 다음 stage를 pending으로 등록.
|
||
|
||
§3 추가:
|
||
stt → [classify] (audio 는 extract 건너뛰고 stt 가 extracted_text 를 채움)
|
||
thumbnail → [] (video 는 leaf — classify/embed 없음)
|
||
|
||
Web/Blog ingest (devonagent 트랙) — plan db-snuggly-petal.md:
|
||
source_channel='devonagent' 인 doc 의 extract 완료 시
|
||
classify/preview/markdown 전부 SKIP → [embed, chunk] 만 enqueue.
|
||
AI 가공 (ai_tldr/ai_bullets 등) 은 별 PR (Mac mini derived-worker).
|
||
"""
|
||
# source_channel-aware override (extract stage 만). source_channel 누락 시 _default.
|
||
extract_override_by_channel = {
|
||
"devonagent": ["embed", "chunk"],
|
||
# crawl 채널 파일형 (KOSHA 첨부/GUIDE PDF 등): preview 사전 캐시 스킵 —
|
||
# 재료 코퍼스 대량 백필이 preview 큐를 점령하지 않게. classify → embed/chunk/markdown 유지.
|
||
"crawl": ["classify"],
|
||
}
|
||
|
||
next_stages = {
|
||
"extract": ["classify", "preview"],
|
||
"classify": ["embed", "chunk", "markdown"],
|
||
"stt": ["classify"],
|
||
}
|
||
|
||
# extract 의 경우만 doc.source_channel 을 lookup 해서 override 적용
|
||
if current_stage == "extract":
|
||
from models.document import Document
|
||
async with async_session() as lookup_session:
|
||
doc = await lookup_session.get(Document, document_id)
|
||
sc = doc.source_channel if doc else None
|
||
if sc in extract_override_by_channel:
|
||
stages = extract_override_by_channel[sc]
|
||
else:
|
||
stages = next_stages.get(current_stage, [])
|
||
else:
|
||
stages = next_stages.get(current_stage, [])
|
||
|
||
if not stages:
|
||
return
|
||
|
||
async with async_session() as session:
|
||
for next_stage in stages:
|
||
await enqueue_stage(session, document_id, next_stage)
|
||
await session.commit()
|
||
|
||
|
||
def _load_workers():
|
||
"""stage → worker process 함수 dict (lazy import — 순환 import 회피)."""
|
||
from workers.classify_worker import process as classify_process
|
||
from workers.chunk_worker import process as chunk_process
|
||
from workers.deep_summary_worker import process as deep_summary_process
|
||
from workers.embed_worker import process as embed_process
|
||
from workers.extract_worker import process as extract_process
|
||
from workers.preview_worker import process as preview_process
|
||
from workers.stt_worker import process as stt_process
|
||
from workers.summarize_worker import process as summarize_process
|
||
from workers.thumbnail_worker import process as thumbnail_process
|
||
from workers.marker_worker import process as marker_process
|
||
from workers.fulltext_worker import process as fulltext_process
|
||
|
||
return {
|
||
"extract": extract_process,
|
||
"classify": classify_process,
|
||
"summarize": summarize_process,
|
||
"embed": embed_process,
|
||
"chunk": chunk_process,
|
||
"preview": preview_process,
|
||
"stt": stt_process,
|
||
"thumbnail": thumbnail_process,
|
||
# PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성.
|
||
# next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립).
|
||
"deep_summary": deep_summary_process,
|
||
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
|
||
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
|
||
"markdown": marker_process,
|
||
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
|
||
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
|
||
"fulltext": fulltext_process,
|
||
}
|
||
|
||
|
||
async def _process_stage(stage, worker_fn):
|
||
"""단일 stage 의 pending 항목을 batch 만큼 가져와 워커 실행.
|
||
|
||
consume_queue / consume_markdown_queue 가 공유한다. 항목별 독립 세션 +
|
||
processing→completed/failed 상태 전이 + 재시도 정책은 기존 로직 그대로.
|
||
"""
|
||
batch_size = BATCH_SIZE.get(stage, 3)
|
||
|
||
# pending 항목 조회 (보류 백오프 deferred_until 미래 항목 제외 — ds-macbook-offload-1)
|
||
async with async_session() as session:
|
||
result = await session.execute(
|
||
select(ProcessingQueue.id, ProcessingQueue.document_id)
|
||
.where(
|
||
ProcessingQueue.stage == stage,
|
||
ProcessingQueue.status == "pending",
|
||
not_deferred_condition(),
|
||
)
|
||
.order_by(ProcessingQueue.created_at)
|
||
.limit(batch_size)
|
||
)
|
||
pending_items = result.all()
|
||
|
||
# 각 항목을 독립 세션에서 처리
|
||
for queue_id, document_id in pending_items:
|
||
# 상태를 processing으로 변경
|
||
async with async_session() as session:
|
||
item = await session.get(ProcessingQueue, queue_id)
|
||
if not item or item.status != "pending":
|
||
continue
|
||
item.status = "processing"
|
||
item.started_at = datetime.now(timezone.utc)
|
||
item.attempts += 1
|
||
await session.commit()
|
||
|
||
# 워커 실행 (독립 세션)
|
||
try:
|
||
# note(메모)는 이미 extracted_text가 있으므로 extract/preview skip
|
||
if stage in ("extract", "preview"):
|
||
from models.document import Document
|
||
async with async_session() as check_session:
|
||
doc = await check_session.get(Document, document_id)
|
||
if doc and doc.file_type == "note":
|
||
async with async_session() as skip_session:
|
||
item = await skip_session.get(ProcessingQueue, queue_id)
|
||
if item:
|
||
item.status = "completed"
|
||
item.completed_at = datetime.now(timezone.utc)
|
||
await skip_session.commit()
|
||
# 완료 커밋 후 enqueue — 실패가 outer except 로 전파돼 completed 재오픈
|
||
# 되지 않게 격리 (R3, 정상 완료 경로와 동일 처리).
|
||
try:
|
||
await enqueue_next_stage(document_id, stage)
|
||
except Exception as enq_err:
|
||
logger.error(
|
||
f"[{stage}] document_id={document_id} skip(note) 완료됐으나 "
|
||
f"다음 단계 enqueue 실패: {enq_err}"
|
||
)
|
||
logger.info(f"[{stage}] document_id={document_id} skip (note)")
|
||
continue
|
||
|
||
async with async_session() as worker_session:
|
||
await worker_fn(document_id, worker_session)
|
||
await worker_session.commit()
|
||
|
||
# 완료 처리
|
||
async with async_session() as session:
|
||
item = await session.get(ProcessingQueue, queue_id)
|
||
if not item:
|
||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||
continue
|
||
item.status = "completed"
|
||
item.completed_at = datetime.now(timezone.utc)
|
||
await session.commit()
|
||
|
||
# 완료는 이미 커밋됨. enqueue_next_stage 실패가 outer except 로 전파되면
|
||
# completed 항목을 재오픈(pending/failed)해 같은 단계를 재실행 = 비싼 작업 중복
|
||
# + 부분 재쓰기. 자체 try 로 격리하고 ERROR 로 가시화한다 (R3).
|
||
try:
|
||
await enqueue_next_stage(document_id, stage)
|
||
except Exception as enq_err:
|
||
logger.error(
|
||
f"[{stage}] document_id={document_id} 완료됐으나 다음 단계 enqueue 실패: {enq_err}"
|
||
)
|
||
logger.info(f"[{stage}] document_id={document_id} 완료")
|
||
|
||
except StageDeferred as defer:
|
||
# 보류 (ds-macbook-offload-1): 맥북 일시 불가(sleep/cold/editor_busy) — 실패 아님.
|
||
# attempts 는 claim 시 선증가분을 반환(미소모)하고 deferred_until 백오프 후 자연 재개.
|
||
# 워커는 완주 전 doc 쓰기를 하지 않으므로 이 시점의 데이터 변경 = 0 (sleep-안전).
|
||
async with async_session() as session:
|
||
item = await session.get(ProcessingQueue, queue_id)
|
||
if not item:
|
||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||
continue
|
||
item.status = "pending"
|
||
item.started_at = None
|
||
item.attempts = max(0, item.attempts - 1)
|
||
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
|
||
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
|
||
await session.commit()
|
||
logger.info(
|
||
f"[{stage}] document_id={document_id} 보류({defer}) — "
|
||
f"{defer.retry_after_minutes}분 후 재개"
|
||
)
|
||
|
||
except Exception as e:
|
||
# 실패 처리
|
||
async with async_session() as session:
|
||
item = await session.get(ProcessingQueue, queue_id)
|
||
if not item:
|
||
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
||
continue
|
||
# 빈 메시지 방어: str → repr → 클래스명 순 fallback
|
||
err_text = str(e) or repr(e) or type(e).__name__
|
||
item.error_message = err_text[:500]
|
||
if item.attempts >= item.max_attempts:
|
||
item.status = "failed"
|
||
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
|
||
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
|
||
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
|
||
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
|
||
# (큐는 failed, 문서는 processing). 큐가 영구 failed 가 될 때 doc 상태도
|
||
# 동기화한다. 재시도 중(attempts<max)엔 pending 큐 행이 남아 orphan 아님.
|
||
if stage == "markdown":
|
||
from models.document import Document
|
||
doc = await session.get(Document, document_id)
|
||
if doc is not None and doc.md_status == "processing":
|
||
doc.md_status = "failed"
|
||
if not doc.md_extraction_error:
|
||
doc.md_extraction_error = err_text[:500]
|
||
logger.warning(
|
||
f"[markdown] document_id={document_id} "
|
||
f"md_status processing→failed 동기화 (B3 orphan 방지)"
|
||
)
|
||
else:
|
||
item.status = "pending"
|
||
item.started_at = None
|
||
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
|
||
await session.commit()
|
||
|
||
|
||
async def consume_queue():
|
||
"""메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리."""
|
||
global _hold_logged
|
||
workers = _load_workers()
|
||
|
||
held = [s for s in MAIN_QUEUE_STAGES if s in settings.pipeline_held_stages]
|
||
if held and not _hold_logged:
|
||
logger.info(f"pipeline.held_stages 보류 중: {held} — claim 하지 않음 (pending 적체 = 의도)")
|
||
_hold_logged = True
|
||
|
||
try:
|
||
await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
|
||
except Exception:
|
||
logger.exception("stale reset failed, but continuing queue consumption")
|
||
|
||
for stage in MAIN_QUEUE_STAGES:
|
||
if stage in settings.pipeline_held_stages:
|
||
continue
|
||
await _process_stage(stage, workers[stage])
|
||
|
||
|
||
async def consume_fast_queue():
|
||
"""embed/chunk 전용 고속 소비자 — LLM 사이클과 완전 디커플 (2026-06-12).
|
||
|
||
main 루프는 classify/summarize/deep 가 사이클을 분 단위로 점유해 건당 <1s 짜리
|
||
embed/chunk 가 사이클당 1번씩만 기회를 얻었다 (실효 ~60건/시 = 적체 원인).
|
||
분리 후 = 1분 잡 × 배치 10 → 캡 ~600건/시. APScheduler max_instances=1 이라
|
||
배치가 1분을 넘으면 다음 fire 는 coalesce (폭주 방지).
|
||
"""
|
||
workers = _load_workers()
|
||
|
||
try:
|
||
await reset_stale_items(FAST_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
|
||
except Exception:
|
||
logger.exception("fast stale reset failed, but continuing queue consumption")
|
||
|
||
for stage in FAST_QUEUE_STAGES:
|
||
if stage in settings.pipeline_held_stages:
|
||
continue
|
||
await _process_stage(stage, workers[stage])
|
||
|
||
|
||
async def consume_markdown_queue():
|
||
"""markdown 전용 큐 소비자 — 대형 PDF split 변환을 메인 파이프라인과 분리.
|
||
|
||
한 doc 변환이 수십 분 걸려도 메인 consume_queue 는 영향받지 않는다.
|
||
APScheduler max_instances=1(기본) 이므로 변환 진행 중엔 이 consumer 의
|
||
다음 fire 만 coalesce 된다(동시 marker 변환 2건 방지 — 의도된 직렬화).
|
||
"""
|
||
workers = _load_workers()
|
||
|
||
try:
|
||
await reset_stale_items(MARKDOWN_QUEUE_STAGES, MARKDOWN_STALE_THRESHOLD_MINUTES)
|
||
except Exception:
|
||
logger.exception("markdown stale reset failed, but continuing queue consumption")
|
||
|
||
for stage in MARKDOWN_QUEUE_STAGES:
|
||
await _process_stage(stage, workers[stage])
|
||
|
||
|
||
async def consume_deep_queue():
|
||
"""deep_summary 전용 큐 소비자 (2026-06-15) — 26B 심층요약을 메인 파이프라인과 분리.
|
||
|
||
deep_summary 1콜이 70~300s(맥미니 Qwen 27B 폴백)라 메인 consume_queue(1분 틱) 안에
|
||
있으면 매 틱이 interval 을 초과해 영구 "maximum running instances" coalesce 되고
|
||
extract/classify 등 경량 stage 까지 함께 굶었다. 분리 후 = deep 만 자기 1분 잡에서
|
||
coalesce, 나머지 메인 루프는 틱 내 완료. max_instances=1 로 동시 deep 2건은 방지.
|
||
"""
|
||
workers = _load_workers()
|
||
|
||
try:
|
||
await reset_stale_items(DEEP_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
|
||
except Exception:
|
||
logger.exception("deep stale reset failed, but continuing queue consumption")
|
||
|
||
for stage in DEEP_QUEUE_STAGES:
|
||
if stage in settings.pipeline_held_stages:
|
||
continue
|
||
await _process_stage(stage, workers[stage])
|