feat(workers): drain 연속보류 내성 — 네트워크 플랩 흡수 (--defer-retries/--defer-wait)
실측 origin: Tailscale direct 경로 ~10분 플랩(13:25~13:34)으로 300건 run 이 32건에서 조기 종료. 보류 시멘틱 자체는 정상(무손상) — run 지속성만 보강. 연속 보류 5회까지 120s 간격 재시도, 한도 도달 = sleep 판정 종료. 성공 시 카운터 리셋. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -10,8 +10,10 @@ ds-macbook-offload-1 P2-3. 운영 패턴 = csb_collector --bulk 와 동일 (컨
|
|||||||
- claim = FOR UPDATE SKIP LOCKED 단건 전이 → consumer(1분 주기)와 이중처리 0
|
- claim = FOR UPDATE SKIP LOCKED 단건 전이 → consumer(1분 주기)와 이중처리 0
|
||||||
- per-item 커밋 = sleep-안전: 중단돼도 완료분 무손상, 진행 중 1건만 stale recovery
|
- per-item 커밋 = sleep-안전: 중단돼도 완료분 무손상, 진행 중 1건만 stale recovery
|
||||||
(10분) 로 pending 복귀. 재실행 멱등 (summarize 는 ai_summary 존재 시 skip)
|
(10분) 로 pending 복귀. 재실행 멱등 (summarize 는 ai_summary 존재 시 skip)
|
||||||
- 보류(StageDeferred = 맥북 sleep/cold/editor_busy): attempts 반환 + deferred_until
|
- 보류(StageDeferred = 맥북 sleep/cold/editor_busy/네트워크 플랩): attempts 반환 +
|
||||||
백오프 기록 후 run 즉시 종료 — 불가 상태의 맥북을 계속 두드리지 않는다
|
deferred_until 백오프 기록. 연속 보류 --defer-retries(기본 5)회까지 --defer-wait
|
||||||
|
(기본 120s) 간격 재시도(분 단위 플랩 흡수), 한도 도달 = sleep 판정으로 run 종료 —
|
||||||
|
불가 상태의 맥북을 계속 두드리지 않는다
|
||||||
- 폴백 0: 맥미니/cloud 강등 없음
|
- 폴백 0: 맥미니/cloud 강등 없음
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -94,7 +96,7 @@ async def _mark_failed(queue_id: int, exc: Exception) -> None:
|
|||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
|
|
||||||
async def drain(stage: str, limit: int) -> None:
|
async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None:
|
||||||
if stage not in DRAIN_STAGES:
|
if stage not in DRAIN_STAGES:
|
||||||
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)")
|
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)")
|
||||||
if settings.ai.deep is None:
|
if settings.ai.deep is None:
|
||||||
@@ -108,6 +110,7 @@ async def drain(stage: str, limit: int) -> None:
|
|||||||
|
|
||||||
done = failed = 0
|
done = failed = 0
|
||||||
deferred = False
|
deferred = False
|
||||||
|
consecutive_defers = 0
|
||||||
while done + failed < limit:
|
while done + failed < limit:
|
||||||
claimed = await _claim_one(stage)
|
claimed = await _claim_one(stage)
|
||||||
if claimed is None:
|
if claimed is None:
|
||||||
@@ -124,15 +127,27 @@ async def drain(stage: str, limit: int) -> None:
|
|||||||
await worker_session.commit()
|
await worker_session.commit()
|
||||||
await _mark_completed(queue_id)
|
await _mark_completed(queue_id)
|
||||||
done += 1
|
done += 1
|
||||||
|
consecutive_defers = 0
|
||||||
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
|
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
|
||||||
except StageDeferred as defer:
|
except StageDeferred as defer:
|
||||||
|
# 일시 불가는 종류가 둘: 진짜 sleep(장시간) vs 일시 네트워크 플랩(분 단위 —
|
||||||
|
# 2026-06-11 실측: Tailscale direct 경로 ~10분 플랩으로 32/300 조기 종료).
|
||||||
|
# 연속 보류 한도까지 대기 후 재시도해 플랩을 흡수, 한도 도달 시 종료(sleep 판정).
|
||||||
await _mark_deferred(queue_id, defer)
|
await _mark_deferred(queue_id, defer)
|
||||||
deferred = True
|
consecutive_defers += 1
|
||||||
|
if consecutive_defers >= defer_retries:
|
||||||
|
deferred = True
|
||||||
|
logger.warning(
|
||||||
|
f"[drain:{stage}] doc={document_id} 맥북 불가({defer}) — 연속 보류 "
|
||||||
|
f"{consecutive_defers}회 한도 도달, run 종료. 맥북 깨운 뒤(또는 "
|
||||||
|
f"{defer.retry_after_minutes}분 후) 재실행"
|
||||||
|
)
|
||||||
|
break
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[drain:{stage}] doc={document_id} 맥북 불가({defer}) — 보류 기록 후 run 종료. "
|
f"[drain:{stage}] doc={document_id} 맥북 일시 불가({defer}) — "
|
||||||
f"맥북 깨운 뒤(또는 {defer.retry_after_minutes}분 후) 재실행"
|
f"{defer_wait}s 대기 후 재시도 ({consecutive_defers}/{defer_retries})"
|
||||||
)
|
)
|
||||||
break
|
await asyncio.sleep(defer_wait)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
await _mark_failed(queue_id, exc)
|
await _mark_failed(queue_id, exc)
|
||||||
failed += 1
|
failed += 1
|
||||||
@@ -156,8 +171,12 @@ def main() -> None:
|
|||||||
parser = argparse.ArgumentParser(description="맥북(M5 Max) burst-drain — 수동 백로그 분담 레버")
|
parser = argparse.ArgumentParser(description="맥북(M5 Max) burst-drain — 수동 백로그 분담 레버")
|
||||||
parser.add_argument("--stage", required=True, choices=DRAIN_STAGES)
|
parser.add_argument("--stage", required=True, choices=DRAIN_STAGES)
|
||||||
parser.add_argument("--limit", type=int, default=50, help="이번 run 최대 처리 건수 (기본 50)")
|
parser.add_argument("--limit", type=int, default=50, help="이번 run 최대 처리 건수 (기본 50)")
|
||||||
|
parser.add_argument("--defer-retries", type=int, default=5,
|
||||||
|
help="연속 보류 허용 횟수 — 네트워크 플랩 흡수 (기본 5, 한도 도달 시 종료)")
|
||||||
|
parser.add_argument("--defer-wait", type=int, default=120,
|
||||||
|
help="보류 재시도 간 대기 초 (기본 120)")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
asyncio.run(drain(args.stage, args.limit))
|
asyncio.run(drain(args.stage, args.limit, args.defer_retries, args.defer_wait))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user