235bbf9881
사용자 '공평하게 동일한 작업' 지적의 비대칭 잔재 2건 + 예고된 배칭 레버: - queue_drain --stage classify (use_deep: deep 슬롯 endpoint + triage sampling, 완료 시 enqueue_next_stage 로 embed/chunk/markdown 연쇄 — DAG 단절 방지) - deep_summary consumer = 맥북 우선, 불가 시 맥미니 primary 즉시 처리(동일 모델 — 강등 아님). drain 은 defer_on_deep_unavailable=True 로 기존 보류-종료 유지 - llm_gate capacity 일반화 (config pipeline.mlx_gate_concurrency, 기본 1, 운영 2) — 'MLX_CONCURRENCY=1 고정' 영구 룰의 전제(single-inference 서버) 소멸을 docstring 에 개정 박제 - analyze_events FK(users) CLI 컨텍스트 INSERT 실패 fix (models.user 명시 import) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
196 lines
9.2 KiB
Python
196 lines
9.2 KiB
Python
"""수동 burst-drain CLI — 맥미니 백로그를 사용자가 의도적으로 맥북(M5 Max)으로 소화.
|
|
|
|
ds-macbook-offload-1 P2-3. 운영 패턴 = csb_collector --bulk 와 동일 (컨테이너 내 실행,
|
|
장기 배치 중 fastapi 재생성 = in-flight 절단이지만 멱등 재실행으로 무손실).
|
|
|
|
docker compose exec fastapi python -m workers.queue_drain --stage summarize --limit 200
|
|
|
|
설계 원칙:
|
|
- deep 슬롯(config.yaml ai.models.deep) 필수 — 부재 시 명시 종료 (silent 강등 금지)
|
|
- claim = FOR UPDATE SKIP LOCKED 단건 전이 → consumer(1분 주기)와 이중처리 0
|
|
- per-item 커밋 = sleep-안전: 중단돼도 완료분 무손상, 진행 중 1건만 stale recovery
|
|
(10분) 로 pending 복귀. 재실행 멱등 (summarize 는 ai_summary 존재 시 skip)
|
|
- 보류(StageDeferred = 맥북 sleep/cold/editor_busy/네트워크 플랩): attempts 반환 +
|
|
deferred_until 백오프 기록. 연속 보류 --defer-retries(기본 5)회까지 --defer-wait
|
|
(기본 120s) 간격 재시도(분 단위 플랩 흡수), 한도 도달 = sleep 판정으로 run 종료 —
|
|
불가 상태의 맥북을 계속 두드리지 않는다
|
|
- 폴백 0: 맥미니/cloud 강등 없음
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import select
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.queue import ProcessingQueue, StageDeferred, not_deferred_condition
|
|
|
|
logger = setup_logger("queue_drain")
|
|
|
|
# summarize = 맥미니 백로그 본체 / deep_summary = 심층 / classify = triage 분류.
|
|
# classify 는 2026-06-12 fair-share 로 합류 — 구 제외 사유(plan Q-4 "triage 경량 = 맥미니
|
|
# 적합")는 Gemma a4b(42 tok/s) 전제. Qwen 27B 전환 후 classify 가 장문 프리필로 컨슈머
|
|
# 사이클을 점유하는 최대 병목이라, 맥북(프리필 ~5배)이 가장 효과적인 분담처다.
|
|
# classify 완료 시 enqueue_next_stage(embed/chunk/markdown) 필수 — 누락 = DAG 단절.
|
|
DRAIN_STAGES = ("summarize", "deep_summary", "classify")
|
|
|
|
|
|
async def _claim_one(stage: str) -> tuple[int, int] | None:
|
|
"""pending 1건을 processing 으로 원자 전이 (SKIP LOCKED — consumer 와 경합 안전)."""
|
|
async with async_session() as session:
|
|
item = (await session.execute(
|
|
select(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.stage == stage,
|
|
ProcessingQueue.status == "pending",
|
|
not_deferred_condition(),
|
|
)
|
|
.order_by(ProcessingQueue.created_at)
|
|
.limit(1)
|
|
.with_for_update(skip_locked=True)
|
|
)).scalar_one_or_none()
|
|
if item is None:
|
|
return None
|
|
item.status = "processing"
|
|
item.started_at = datetime.now(timezone.utc)
|
|
item.attempts += 1
|
|
claimed = (item.id, item.document_id)
|
|
await session.commit()
|
|
return claimed
|
|
|
|
|
|
async def _mark_completed(queue_id: int) -> None:
|
|
async with async_session() as session:
|
|
item = await session.get(ProcessingQueue, queue_id)
|
|
if item:
|
|
item.status = "completed"
|
|
item.completed_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
|
|
|
|
async def _mark_deferred(queue_id: int, defer: StageDeferred) -> None:
|
|
"""보류: attempts 반환(미소모) + deferred_until 백오프 — consumer 의 처리와 동형."""
|
|
async with async_session() as session:
|
|
item = await session.get(ProcessingQueue, queue_id)
|
|
if item:
|
|
item.status = "pending"
|
|
item.started_at = None
|
|
item.attempts = max(0, item.attempts - 1)
|
|
until = datetime.now(timezone.utc) + timedelta(minutes=defer.retry_after_minutes)
|
|
item.payload = {**(item.payload or {}), "deferred_until": until.isoformat()}
|
|
await session.commit()
|
|
|
|
|
|
async def _mark_failed(queue_id: int, exc: Exception) -> None:
|
|
"""실패: consumer 와 동일 재시도 정책 (attempts >= max → failed, 아니면 pending 복귀)."""
|
|
async with async_session() as session:
|
|
item = await session.get(ProcessingQueue, queue_id)
|
|
if item:
|
|
err_text = str(exc) or repr(exc) or type(exc).__name__
|
|
item.error_message = err_text[:500]
|
|
if item.attempts >= item.max_attempts:
|
|
item.status = "failed"
|
|
else:
|
|
item.status = "pending"
|
|
item.started_at = None
|
|
await session.commit()
|
|
|
|
|
|
async def drain(stage: str, limit: int, defer_retries: int = 5, defer_wait: int = 120) -> None:
|
|
if stage not in DRAIN_STAGES:
|
|
raise SystemExit(f"--stage 는 {DRAIN_STAGES} 만 허용")
|
|
if settings.ai.deep is None:
|
|
raise SystemExit(
|
|
"config.yaml ai.models.deep 슬롯 미구성 — drain 은 맥북 분담 전용 레버라 진행하지 않음"
|
|
" (맥미니로의 silent 강등 금지)"
|
|
)
|
|
|
|
from workers.classify_worker import process as classify_process
|
|
from workers.deep_summary_worker import process as deep_summary_process
|
|
from workers.queue_consumer import enqueue_next_stage
|
|
from workers.summarize_worker import process as summarize_process
|
|
|
|
done = failed = 0
|
|
deferred = False
|
|
consecutive_defers = 0
|
|
while done + failed < limit:
|
|
claimed = await _claim_one(stage)
|
|
if claimed is None:
|
|
logger.info(f"[drain:{stage}] pending 소진 — 종료")
|
|
break
|
|
queue_id, document_id = claimed
|
|
try:
|
|
async with async_session() as worker_session:
|
|
if stage == "summarize":
|
|
await summarize_process(document_id, worker_session, use_deep=True)
|
|
elif stage == "classify":
|
|
await classify_process(document_id, worker_session, use_deep=True)
|
|
else:
|
|
# deep_summary: drain 은 맥북 전용 레버 — 불가 시 보류(폴백은 consumer 만)
|
|
await deep_summary_process(
|
|
document_id, worker_session, defer_on_deep_unavailable=True
|
|
)
|
|
await worker_session.commit()
|
|
await _mark_completed(queue_id)
|
|
# 다음 stage 연쇄 — classify 는 embed/chunk/markdown enqueue (consumer 와 동형,
|
|
# summarize/deep_summary 는 next_stages 미등록이라 no-op)
|
|
await enqueue_next_stage(document_id, stage)
|
|
done += 1
|
|
consecutive_defers = 0
|
|
logger.info(f"[drain:{stage}] {done}/{limit} doc={document_id} 완료")
|
|
except StageDeferred as defer:
|
|
# 일시 불가는 종류가 둘: 진짜 sleep(장시간) vs 일시 네트워크 플랩(분 단위 —
|
|
# 2026-06-11 실측: Tailscale direct 경로 ~10분 플랩으로 32/300 조기 종료).
|
|
# 연속 보류 한도까지 대기 후 재시도해 플랩을 흡수, 한도 도달 시 종료(sleep 판정).
|
|
await _mark_deferred(queue_id, defer)
|
|
consecutive_defers += 1
|
|
if consecutive_defers >= defer_retries:
|
|
deferred = True
|
|
logger.warning(
|
|
f"[drain:{stage}] doc={document_id} 맥북 불가({defer}) — 연속 보류 "
|
|
f"{consecutive_defers}회 한도 도달, run 종료. 맥북 깨운 뒤(또는 "
|
|
f"{defer.retry_after_minutes}분 후) 재실행"
|
|
)
|
|
break
|
|
logger.warning(
|
|
f"[drain:{stage}] doc={document_id} 맥북 일시 불가({defer}) — "
|
|
f"{defer_wait}s 대기 후 재시도 ({consecutive_defers}/{defer_retries})"
|
|
)
|
|
await asyncio.sleep(defer_wait)
|
|
except Exception as exc:
|
|
await _mark_failed(queue_id, exc)
|
|
failed += 1
|
|
logger.error(f"[drain:{stage}] doc={document_id} 실패: {exc}")
|
|
|
|
# 종료 요약 (잔여 = 지금 시점 pending 수)
|
|
async with async_session() as session:
|
|
from sqlalchemy import func as sa_func
|
|
remaining = (await session.execute(
|
|
select(sa_func.count()).select_from(ProcessingQueue).where(
|
|
ProcessingQueue.stage == stage, ProcessingQueue.status == "pending",
|
|
)
|
|
)).scalar_one()
|
|
logger.info(
|
|
f"[drain:{stage}] 요약 — 완료 {done} · 실패 {failed} · "
|
|
f"보류종료 {'예' if deferred else '아니오'} · 잔여 pending {remaining}"
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="맥북(M5 Max) burst-drain — 수동 백로그 분담 레버")
|
|
parser.add_argument("--stage", required=True, choices=DRAIN_STAGES)
|
|
parser.add_argument("--limit", type=int, default=50, help="이번 run 최대 처리 건수 (기본 50)")
|
|
parser.add_argument("--defer-retries", type=int, default=5,
|
|
help="연속 보류 허용 횟수 — 네트워크 플랩 흡수 (기본 5, 한도 도달 시 종료)")
|
|
parser.add_argument("--defer-wait", type=int, default=120,
|
|
help="보류 재시도 간 대기 초 (기본 120)")
|
|
args = parser.parse_args()
|
|
asyncio.run(drain(args.stage, args.limit, args.defer_retries, args.defer_wait))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|