"""Phase 4-A study_question_jobs consumer — APScheduler 1분 간격. study 도메인 전용 큐 (processing_queue 와 분리). BATCH_SIZE=1 — MLX 26B 가 deep_summary 와 같은 Semaphore(1) 게이트를 공유하므로 GPU 부하 통제. stage 별 worker dispatch: - kind='explanation': study_explanation_worker.run_explanation_job (Phase 4-A) - kind='session_summary': Phase 4-B 예약, 1차 미구현 — pending 만 보면 즉시 skipped 처리 """ from __future__ import annotations from datetime import datetime, timedelta, timezone from sqlalchemy import select, update from sqlalchemy.exc import SQLAlchemyError from core.database import async_session from core.config import settings from core.utils import setup_logger from models.study_question_job import StudyQuestionJob from workers.study_explanation_worker import run_explanation_job logger = setup_logger("study_queue_consumer") # 한 사이클에 한 row 씩 — MLX 게이트 직렬화 + GPU 부하 예측 가능. BATCH_SIZE = 1 # processing 상태로 STALE_MINUTES 이상 방치된 job 은 worker 가 죽었다고 보고 pending 복구. STALE_MINUTES = 10 async def reset_stale_study_jobs() -> None: """processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구. partial unique idx 가 (qid, kind) WHERE status IN ('pending','processing') 이라 같은 qid 의 다른 pending 이 동시에 있을 일이 거의 없음 (enqueue 가 on_conflict_do_nothing). 그래도 안전하게 update 만 — 충돌 시 IntegrityError 는 단건 단위 catch. """ cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES) try: async with async_session() as session: stmt = ( update(StudyQuestionJob) .where( StudyQuestionJob.status == "processing", StudyQuestionJob.started_at.is_not(None), StudyQuestionJob.started_at < cutoff, ) .values(status="pending", started_at=None) ) result = await session.execute(stmt) await session.commit() n = result.rowcount or 0 if n > 0: logger.warning("study_jobs_stale_reset count=%s", n) except SQLAlchemyError as e: logger.exception("study_jobs_stale_reset_failed: %s", e) async def consume_study_queue() -> None: """APScheduler 진입점. pending job BATCH_SIZE 만큼 처리.""" # 생성 LLM 홀드: env(study_explanation_enabled) 와 별개의 self-contained 게이트. # pending 은 그대로 유지 (Mac mini derived-worker 흡수 경로도 본 게이트와 무관). if "study_explanation" in settings.pipeline_held_stages: logger.debug("study_explanation 보류 (pipeline.held_stages)") return await reset_stale_study_jobs() async with async_session() as session: rows = ( await session.execute( select(StudyQuestionJob) .where(StudyQuestionJob.status == "pending") .order_by(StudyQuestionJob.id.asc()) .limit(BATCH_SIZE) ) ).scalars().all() for job_row in rows: # 각 job 마다 독립 세션 — 한 job 실패가 다른 job 에 전염 X. async with async_session() as s: try: # 같은 row 를 fresh load (lock 회피, 세션 격리) job = await s.get(StudyQuestionJob, job_row.id) if job is None or job.status != "pending": continue # 다른 cycle 에서 이미 처리 if job.kind == "explanation": if not settings.study_explanation_enabled: # PR-MacMini-Derived-Worker-1: explanation owner = Mac mini. # status/attempts 변경하지 않고 pending 그대로 유지 → Mac mini worker 가 흡수. logger.info("skip explanation owner=macmini job_id=%s qid=%s", job.id, job.study_question_id) continue await run_explanation_job(s, job) elif job.kind == "session_summary": # Phase 4-B 미구현 — 즉시 skipped 처리 (lost in queue 방지) job.status = "skipped" job.error_code = "unknown" job.error_message = "session_summary not implemented in Phase 4-A" job.completed_at = datetime.now(timezone.utc) logger.info("study_job_unsupported_kind id=%s kind=%s", job.id, job.kind) else: job.status = "skipped" job.error_code = "unknown" job.error_message = f"unknown kind: {job.kind!r}" job.completed_at = datetime.now(timezone.utc) logger.warning("study_job_unknown_kind id=%s kind=%s", job.id, job.kind) await s.commit() logger.info( "study_job_processed id=%s qid=%s kind=%s status=%s error_code=%s attempts=%s", job.id, job.study_question_id, job.kind, job.status, job.error_code, job.attempts, ) except Exception as e: # worker 안에서 잡지 못한 catastrophic 실패 — 세션 rollback 후 다음 cycle 재처리. # study_explanation_worker 가 catch-all except 를 가지므로 여기 도달은 드묾. await s.rollback() logger.exception("study_job_outer_failed job_id=%s: %s", job_row.id, e)