fix(search): sync doc md_status to failed on permanent markdown queue failure

marker_worker 는 변환 시작 시 doc.md_status=processing 으로 표시하는데, 변환이
_fail()/_set_skipped() 를 거치지 않고 예외(예: 대형 batch ReadTimeout)로 죽으면
queue_consumer 가 큐 행만 failed 처리하고 doc.md_status 는 processing 에 영구 고착
= orphan (큐 failed, 문서 processing). markdown consumer 분리 후 이 orphan 이
tail 재처리에서 재발(5149/5201)하여 근본 원인 차단.

_process_stage except 블록에서 큐 항목이 영구 실패(attempts>=max)할 때 stage가
markdown 이고 doc.md_status=processing 이면 failed 로 동기화. 재시도 중
(attempts<max)엔 pending 큐 행이 남아 orphan 아니므로 미터치.

검증: synthetic 영구 실패 경로 → md_status processing→failed 동기화 PASS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-24 12:06:32 +00:00
parent 2edc80d4bb
commit 0854c72c70
+16
View File
@@ -279,6 +279,22 @@ async def _process_stage(stage, worker_fn):
if item.attempts >= item.max_attempts:
item.status = "failed"
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
# (큐는 failed, 문서는 processing). 큐가 영구 failed 가 될 때 doc 상태도
# 동기화한다. 재시도 중(attempts<max)엔 pending 큐 행이 남아 orphan 아님.
if stage == "markdown":
from models.document import Document
doc = await session.get(Document, document_id)
if doc is not None and doc.md_status == "processing":
doc.md_status = "failed"
if not doc.md_extraction_error:
doc.md_extraction_error = err_text[:500]
logger.warning(
f"[markdown] document_id={document_id} "
f"md_status processing→failed 동기화 (B3 orphan 방지)"
)
else:
item.status = "pending"
item.started_at = None