From c11f113cf101fb3064ff391251c8a66104dd8208 Mon Sep 17 00:00:00 2001 From: hyungi Date: Tue, 16 Jun 2026 13:24:25 +0900 Subject: [PATCH] =?UTF-8?q?fix(workers):=20silent=20completion=20=EC=B0=A8?= =?UTF-8?q?=EB=8B=A8=20=E2=80=94=20transient=20re-raise=20+=20enqueue=20?= =?UTF-8?q?=EA=B2=A9=EB=A6=AC=20(R3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit worker_fn 이 transient 실패를 삼켜 정상 반환하면 queue_consumer 가 status=completed 로 확정 → 영구 데이터 손실 + 재시도/추적 0. 정본(extract/marker/fulltext/stt 는 re-raise)과 어긋난 곳을 통일: - deep_summary: 호출 실패(call_failed)를 삼키지 않고 raise → 재시도→failed dead-letter (이전엔 ai_detail_summary 영구 누락 + tier triage 고착). - thumbnail: _extract_thumbnail 실패를 silent return → raise (썸네일 영구 누락 방지). - queue_consumer: 완료 커밋 후 enqueue_next_stage(정상·skip-note 2곳)를 자체 try 로 격리 — enqueue 실패가 outer except 로 전파돼 completed 항목을 재오픈(stage 재실행) 하던 결함 차단. 실패는 ERROR 로 가시화. - broad except 에 asyncio.CancelledError 명시 통과(embed worker / ask classifier·verifier). dead-letter = ProcessingQueue.status='failed'(기존 attempts/max_attempts 머신 재사용, 신규 컬럼 불필요). 검증: py_compile 통과. 큐 재시도 의미 synthetic smoke(staging) 예정. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/api/search.py | 8 ++++++-- app/workers/deep_summary_worker.py | 8 ++++++-- app/workers/queue_consumer.py | 20 ++++++++++++++++++-- app/workers/study_question_embed_worker.py | 4 +++- app/workers/thumbnail_worker.py | 7 ++++++- 5 files changed, 39 insertions(+), 8 deletions(-) diff --git a/app/api/search.py b/app/api/search.py index 2cd43ed..d70c5af 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -710,7 +710,9 @@ async def ask( # 30s 로 align → classifier 동작 안정. ask 응답 latency 상한 ↑ 의도. try: classifier_result = await asyncio.wait_for(classifier_task, timeout=30.0) - except (asyncio.TimeoutError, Exception): + except asyncio.CancelledError: + raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3) + except Exception: classifier_result = ClassifierResult("timeout", None, [], [], 0.0) defense_log["classifier"] = { @@ -872,7 +874,9 @@ async def ask( # → classifier 와 동일 패턴 (search.py:522 가 6s→15s swap 했던 case). 10s 로 align. try: verifier_result = await asyncio.wait_for(verifier_task, timeout=10.0) - except (asyncio.TimeoutError, Exception): + except asyncio.CancelledError: + raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3) + except Exception: verifier_result = VerifierResult("timeout", [], 0.0) # Verifier contradictions → grounding flags 머지 (prefix 로 구분, severity 3단계) diff --git a/app/workers/deep_summary_worker.py b/app/workers/deep_summary_worker.py index 728c181..077ff54 100644 --- a/app/workers/deep_summary_worker.py +++ b/app/workers/deep_summary_worker.py @@ -144,9 +144,13 @@ async def process( logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)") raise except Exception as exc: + # 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가 + # attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면 + # worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 + + # tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치. + # 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전). logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}") - parse_error = "call_failed" - raw = "" + raise finally: await client.close() diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 4903ff1..c7c2ebb 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -275,7 +275,15 @@ async def _process_stage(stage, worker_fn): item.status = "completed" item.completed_at = datetime.now(timezone.utc) await skip_session.commit() - await enqueue_next_stage(document_id, stage) + # 완료 커밋 후 enqueue — 실패가 outer except 로 전파돼 completed 재오픈 + # 되지 않게 격리 (R3, 정상 완료 경로와 동일 처리). + try: + await enqueue_next_stage(document_id, stage) + except Exception as enq_err: + logger.error( + f"[{stage}] document_id={document_id} skip(note) 완료됐으나 " + f"다음 단계 enqueue 실패: {enq_err}" + ) logger.info(f"[{stage}] document_id={document_id} skip (note)") continue @@ -293,7 +301,15 @@ async def _process_stage(stage, worker_fn): item.completed_at = datetime.now(timezone.utc) await session.commit() - await enqueue_next_stage(document_id, stage) + # 완료는 이미 커밋됨. enqueue_next_stage 실패가 outer except 로 전파되면 + # completed 항목을 재오픈(pending/failed)해 같은 단계를 재실행 = 비싼 작업 중복 + # + 부분 재쓰기. 자체 try 로 격리하고 ERROR 로 가시화한다 (R3). + try: + await enqueue_next_stage(document_id, stage) + except Exception as enq_err: + logger.error( + f"[{stage}] document_id={document_id} 완료됐으나 다음 단계 enqueue 실패: {enq_err}" + ) logger.info(f"[{stage}] document_id={document_id} 완료") except StageDeferred as defer: diff --git a/app/workers/study_question_embed_worker.py b/app/workers/study_question_embed_worker.py index 016da7b..28fdbf1 100644 --- a/app/workers/study_question_embed_worker.py +++ b/app/workers/study_question_embed_worker.py @@ -102,7 +102,9 @@ async def _process_one(session: AsyncSession, qid: int, client: AIClient) -> boo try: async with asyncio.timeout(EMBED_TIMEOUT_S): vec = await client.embed(text) - except (asyncio.TimeoutError, Exception) as e: + except asyncio.CancelledError: + raise # 취소는 전파 — broad except 가 삼키지 않게 명시 (R3) + except Exception as e: logger.warning("study_q_embed_failed qid=%s err=%s: %s", qid, type(e).__name__, e) # 실패 — status='failed'. 직전 embedding 보존. q.embedding_status = "failed" diff --git a/app/workers/thumbnail_worker.py b/app/workers/thumbnail_worker.py index 89bd3eb..8a1b360 100644 --- a/app/workers/thumbnail_worker.py +++ b/app/workers/thumbnail_worker.py @@ -121,7 +121,12 @@ async def process(document_id: int, session: AsyncSession) -> None: ok = _extract_thumbnail(source, output, seek) if not ok: - return + # 썸네일 추출 실패(ffmpeg)는 삼키지 않고 raise (R3) — queue_consumer 가 attempts + # 소진까지 재시도 후 status=failed 로 가시화. silent return 이면 큐가 completed 로 + # 확정 + 썸네일 영구 누락 + 재시도/추적 0 (silent skip). 손상 영상이면 failed 로 안착. + raise RuntimeError( + f"thumbnail 추출 실패: document_id={document_id} source={source}" + ) doc.thumbnail_path = str(output) doc.updated_at = datetime.now(timezone.utc)