From 371ee4ebe67c9fdfe62d42d0c5749b6d6a2f1266 Mon Sep 17 00:00:00 2001 From: hyungi Date: Fri, 3 Jul 2026 08:14:12 +0900 Subject: [PATCH] =?UTF-8?q?feat(presegment):=20=EC=98=81=EA=B5=AC=20?= =?UTF-8?q?=EC=8B=A4=ED=8C=A8=20Chat=20=EC=95=8C=EB=9E=8C=20=E2=80=94=20AL?= =?UTF-8?q?ERT=5FFAIL=5FSTAGES=20allowlist(=EA=B8=B0=EB=B3=B8=20deep=5Fsum?= =?UTF-8?q?mary,summarize)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/workers/queue_consumer.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index a72acd2..3b1e308 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -23,6 +23,15 @@ logger = setup_logger("queue_consumer") # pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만. _hold_logged = False +# PR3 후속(2026-07-03): 영구 실패 알람 — 사람이 개입해야 풀리는 상태라 Chat 웹훅 발화. +# allowlist 로 소음 제한: embed/chunk 류 대량 배치가 일제히 실패하면 문서 수만큼 알람이 +# 쏟아지므로, 건당 가치가 높고 발생률이 낮은 LLM 스테이지만 기본 대상으로 한다. +_ALERT_FAIL_STAGES = { + s.strip() + for s in os.getenv("ALERT_FAIL_STAGES", "deep_summary,summarize").split(",") + if s.strip() +} + # stage별 배치 크기 # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. @@ -353,6 +362,8 @@ async def _process_stage(stage, worker_fn): except Exception as e: # 실패 처리 + permanently_failed = False + doc_title = None async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if not item: @@ -363,7 +374,16 @@ async def _process_stage(stage, worker_fn): item.error_message = err_text[:500] if item.attempts >= item.max_attempts: item.status = "failed" + permanently_failed = True logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") + if stage in _ALERT_FAIL_STAGES: + # 알람용 제목 best-effort — 실패해도 알람 자체는 발화한다. + try: + from models.document import Document + _doc = await session.get(Document, document_id) + doc_title = getattr(_doc, "title", None) + except Exception: + doc_title = None # B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다. # 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형 # batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan @@ -385,6 +405,18 @@ async def _process_stage(stage, worker_fn): item.started_at = None logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") await session.commit() + if permanently_failed and stage in _ALERT_FAIL_STAGES: + # 영구 실패 = 무인 파이프라인이 스스로 못 푸는 상태 → 유인 전환 알람. + # send_alert 는 절대 raise 하지 않음(no-op/실패 = False 반환뿐). + from services.alerts import send_alert + await send_alert( + f"[DS] {stage} 영구 실패 — doc {document_id}", + ( + f"{doc_title or '(제목 미상)'}\n" + f"에러: {err_text[:300]}\n" + f"확인: scripts/presegment_attended.py list (보류/거부 사유) 또는 큐 재큐" + ), + ) async def consume_queue():