diff --git a/app/api/search.py b/app/api/search.py index 2cd43ed..d70c5af 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -710,7 +710,9 @@ async def ask( # 30s 로 align → classifier 동작 안정. ask 응답 latency 상한 ↑ 의도. try: classifier_result = await asyncio.wait_for(classifier_task, timeout=30.0) - except (asyncio.TimeoutError, Exception): + except asyncio.CancelledError: + raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3) + except Exception: classifier_result = ClassifierResult("timeout", None, [], [], 0.0) defense_log["classifier"] = { @@ -872,7 +874,9 @@ async def ask( # → classifier 와 동일 패턴 (search.py:522 가 6s→15s swap 했던 case). 10s 로 align. try: verifier_result = await asyncio.wait_for(verifier_task, timeout=10.0) - except (asyncio.TimeoutError, Exception): + except asyncio.CancelledError: + raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3) + except Exception: verifier_result = VerifierResult("timeout", [], 0.0) # Verifier contradictions → grounding flags 머지 (prefix 로 구분, severity 3단계) diff --git a/app/workers/deep_summary_worker.py b/app/workers/deep_summary_worker.py index 728c181..077ff54 100644 --- a/app/workers/deep_summary_worker.py +++ b/app/workers/deep_summary_worker.py @@ -144,9 +144,13 @@ async def process( logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)") raise except Exception as exc: + # 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가 + # attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면 + # worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 + + # tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치. + # 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전). logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}") - parse_error = "call_failed" - raw = "" + raise finally: await client.close() diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 4903ff1..c7c2ebb 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -275,7 +275,15 @@ async def _process_stage(stage, worker_fn): item.status = "completed" item.completed_at = datetime.now(timezone.utc) await skip_session.commit() - await enqueue_next_stage(document_id, stage) + # 완료 커밋 후 enqueue — 실패가 outer except 로 전파돼 completed 재오픈 + # 되지 않게 격리 (R3, 정상 완료 경로와 동일 처리). + try: + await enqueue_next_stage(document_id, stage) + except Exception as enq_err: + logger.error( + f"[{stage}] document_id={document_id} skip(note) 완료됐으나 " + f"다음 단계 enqueue 실패: {enq_err}" + ) logger.info(f"[{stage}] document_id={document_id} skip (note)") continue @@ -293,7 +301,15 @@ async def _process_stage(stage, worker_fn): item.completed_at = datetime.now(timezone.utc) await session.commit() - await enqueue_next_stage(document_id, stage) + # 완료는 이미 커밋됨. enqueue_next_stage 실패가 outer except 로 전파되면 + # completed 항목을 재오픈(pending/failed)해 같은 단계를 재실행 = 비싼 작업 중복 + # + 부분 재쓰기. 자체 try 로 격리하고 ERROR 로 가시화한다 (R3). + try: + await enqueue_next_stage(document_id, stage) + except Exception as enq_err: + logger.error( + f"[{stage}] document_id={document_id} 완료됐으나 다음 단계 enqueue 실패: {enq_err}" + ) logger.info(f"[{stage}] document_id={document_id} 완료") except StageDeferred as defer: diff --git a/app/workers/study_question_embed_worker.py b/app/workers/study_question_embed_worker.py index 016da7b..28fdbf1 100644 --- a/app/workers/study_question_embed_worker.py +++ b/app/workers/study_question_embed_worker.py @@ -102,7 +102,9 @@ async def _process_one(session: AsyncSession, qid: int, client: AIClient) -> boo try: async with asyncio.timeout(EMBED_TIMEOUT_S): vec = await client.embed(text) - except (asyncio.TimeoutError, Exception) as e: + except asyncio.CancelledError: + raise # 취소는 전파 — broad except 가 삼키지 않게 명시 (R3) + except Exception as e: logger.warning("study_q_embed_failed qid=%s err=%s: %s", qid, type(e).__name__, e) # 실패 — status='failed'. 직전 embedding 보존. q.embedding_status = "failed" diff --git a/app/workers/thumbnail_worker.py b/app/workers/thumbnail_worker.py index 89bd3eb..8a1b360 100644 --- a/app/workers/thumbnail_worker.py +++ b/app/workers/thumbnail_worker.py @@ -121,7 +121,12 @@ async def process(document_id: int, session: AsyncSession) -> None: ok = _extract_thumbnail(source, output, seek) if not ok: - return + # 썸네일 추출 실패(ffmpeg)는 삼키지 않고 raise (R3) — queue_consumer 가 attempts + # 소진까지 재시도 후 status=failed 로 가시화. silent return 이면 큐가 completed 로 + # 확정 + 썸네일 영구 누락 + 재시도/추적 0 (silent skip). 손상 영상이면 failed 로 안착. + raise RuntimeError( + f"thumbnail 추출 실패: document_id={document_id} source={source}" + ) doc.thumbnail_path = str(output) doc.updated_at = datetime.now(timezone.utc)