"""수동 burst-drain CLI — 맥미니 백로그를 사용자가 의도적으로 맥북(M5 Max)으로 소화. ds-macbook-offload-1 P2-3. 운영 패턴 = csb_collector --bulk 와 동일 (컨테이너 내 실행, 장기 배치 중 fastapi 재생성 = in-flight 절단이지만 멱등 재실행으로 무손실). docker compose exec fastapi python -m workers.queue_drain --stage summarize --limit 200 설계 원칙: - deep 슬롯(config.yaml ai.models.deep) 필수 — 부재 시 명시 종료 (silent 강등 금지) - claim = FOR UPDATE SKIP LOCKED 단건 전이 → consumer(1분 주기)와 이중처리 0 - per-item 커밋 = sleep-안전: 중단돼도 완료분 무손상, 진행 중 1건만 stale recovery (10분) 로 pending 복귀. 재실행 멱등 (summarize 는 ai_summary 존재 시 skip) - 보류(StageDeferred = 맥북 sleep/cold/editor_busy/네트워크 플랩): attempts 반환 + deferred_until 백오프 기록. 연속 보류 --defer-retries(기본 5)회까지 --defer-wait (기본 120s) 간격 재시도(분 단위 플랩 흡수), 한도 도달 = sleep 판정으로 run 종료 — 불가 상태의 맥북을 계속 두드리지 않는다 - 폴백 0: 맥미니/cloud 강등 없음 """ import argparse import asyncio from datetime import datetime, timedelta, timezone from sqlalchemy import select from core.config import settings from core.database import async_session from core.utils import setup_logger from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition logger = setup_logger("queue_drain") # summarize = 맥미니 백로그 본체 / deep_summary = 심층 (consumer 도 deep 슬롯 시 맥북 경유). # classify 는 triage 경량 호출이라 맥미니 적합 — 대상에서 제외 (plan Q-4). DRAIN_STAGES = ("summarize", "deep_summary") async def _claim_one(stage: str) -> tuple[int, int] | None: """pending 1건을 processing 으로 원자 전이 (SKIP LOCKED — consumer 와 경합 안전).""" async with async_session() as session: item = (await session.execute( select(ProcessingQueue) .where( ProcessingQueue.stage == stage, ProcessingQueue.status == "pending", not_deferred_condition(), ) .order_by(ProcessingQueue.created_at) .limit(1) .with_for_update(skip_locked=True) )).scalar_one_or_none() if item is None: return None item.status = "processing" item.started_at = datetime.now(timezone.utc) item.attempts += 1 claimed = (item.id, item.document_id) await session.commit() return claimed async def _mark_completed(queue_id: int) -> None: async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if item: item.status = "completed" item.completed_at = datetime.now(timezone.utc) await session.commit() async def _mark_deferred(queue_id: int, defer: StageDeferred) -> None: """보류: attempts 반환(미소모) + deferred_until 백오프 — consumer 의 처리와 동형.""" async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if item: item.status = "pending" item.started_at = None item.attempts = max(0, item.attempts - 1) until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes) item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()} await session.commit() async def _mark_failed(queue_id: int, exc: Exception) -> None: """실패: consumer 와 동일 재시도 정책 (attempts >= max → failed, 아니면 pending 복귀).""" async with async_session() as session: item = await session.get(ProcessingQueue, queue_id) if item: err_text = str(exc) or repr(exc) or type(exc).__name__ item.error_message = err_text[:500] if item.attempts >= item.max_attempts: item.status = "failed" else: item.status = "pending" item.started_at = None await session.commit() async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None: if stage not in DRAIN_STAGES: raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용 (classify 등은 맥미니 적합 — plan Q-4)") if settings.ai.deep is None: raise SystemExit( "config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음" " (맥미니로의 silent 강등 금지)" ) from workers.deep_summary_worker import process as deep_summary_process from workers.summarize_worker import process as summarize_process done = failed = 0 deferred = False consecutive_defers = 0 while done + failed < limit: claimed = await _claim_one(stage) if claimed is None: logger.info(f"[drain:{stage}] pending 소진 — 종료") break queue_id, document_id = claimed try: async with async_session() as worker_session: if stage == "summarize": await summarize_process(document_id, worker_session, use_deep=True) else: # deep_summary 는 deep 슬롯 구성 시 워커가 자체적으로 맥북 경유 await deep_summary_process(document_id, worker_session) await worker_session.commit() await _mark_completed(queue_id) done += 1 consecutive_defers = 0 logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료") except StageDeferred as defer: # 일시 불가는 종류가 둘: 진짜 sleep(장시간) vs 일시 네트워크 플랩(분 단위 — # 2026-06-11 실측: Tailscale direct 경로 ~10분 플랩으로 32/300 조기 종료). # 연속 보류 한도까지 대기 후 재시도해 플랩을 흡수, 한도 도달 시 종료(sleep 판정). await _mark_deferred(queue_id, defer) consecutive_defers += 1 if consecutive_defers >= defer_retries: deferred = True logger.warning( f"[drain:{stage}] doc={document_id} 맥북 불가({defer}) — 연속 보류 " f"{consecutive_defers}회 한도 도달, run 종료. 맥북 깨운 뒤(또는 " f"{defer.retry_after_minutes}분 후) 재실행" ) break logger.warning( f"[drain:{stage}] doc={document_id} 맥북 일시 불가({defer}) — " f"{defer_wait}s 대기 후 재시도 ({consecutive_defers}/{defer_retries})" ) await asyncio.sleep(defer_wait) except Exception as exc: await _mark_failed(queue_id, exc) failed += 1 logger.error(f"[drain:{stage}] doc={document_id} 실패: {exc}") # 종료 요약 (잔여 = 지금 시점 pending 수) async with async_session() as session: from sqlalchemy import func as sa_func remaining = (await session.execute( select(sa_func.count()).select_from(ProcessingQueue).where( ProcessingQueue.stage == stage, ProcessingQueue.status == "pending", ) )).scalar_one() logger.info( f"[drain:{stage}] 요약 — 완료 {done} · 실패 {failed} · " f"보류종료 {'예' if deferred else '아니오'} · 잔여 pending {remaining}" ) def main() -> None: parser = argparse.ArgumentParser(description="맥북(M5 Max) burst-drain — 수동 백로그 분담 레버") parser.add_argument("--stage", required=True, choices=DRAIN_STAGES) parser.add_argument("--limit", type=int, default=50, help="이번 run 최대 처리 건수 (기본 50)") parser.add_argument("--defer-retries", type=int, default=5, help="연속 보류 허용 횟수 — 네트워크 플랩 흡수 (기본 5, 한도 도달 시 종료)") parser.add_argument("--defer-wait", type=int, default=120, help="보류 재시도 간 대기 초 (기본 120)") args = parser.parse_args() asyncio.run(drain(args.stage, args.limit, args.defer_retries, args.defer_wait)) if __name__ == "__main__": main()