cd0040925a
맥북 LLM 백지화 + 맥미니 모델 재결정에 따라 DS 의 생성 LLM 소비를 일괄 보류. held = classify/summarize/deep_summary(큐, claim 미발생·attempts 미소모) + digest(04:00)/briefing(05:10) cron + study explanation/session_analysis/memo_card 컨슈머. GPU 특화 스테이지·수집기·인터랙티브(ask/eid chat)는 무영향. 기본값 [] = 무동작. /api/digest/regenerate 는 홀드 중 409 명시. 해제 = config held_stages 비우고 fastapi 재기동. exec plan: ~/.claude/plans/ds-llm-hold-exec-20260611.md Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
84 lines
3.2 KiB
Python
84 lines
3.2 KiB
Python
"""Phase 4-B v1 study_quiz_session_jobs consumer — APScheduler 1분 간격.
|
|
|
|
세션 단위 전용 — study_question_jobs (4-A) 와 분리. 운영 SQL 명확성.
|
|
BATCH_SIZE=1, MLX gate Semaphore(1) 4-A 와 공유 — 4-A 가 처리 중이면 직렬 대기.
|
|
STALE_MINUTES=10 자체 복구.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.study_quiz_session_job import StudyQuizSessionJob
|
|
from workers.study_session_analysis_worker import run_session_analysis_job
|
|
|
|
logger = setup_logger("study_session_queue_consumer")
|
|
|
|
BATCH_SIZE = 1
|
|
STALE_MINUTES = 10
|
|
|
|
|
|
async def reset_stale_session_jobs() -> None:
|
|
"""processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구."""
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES)
|
|
try:
|
|
async with async_session() as session:
|
|
stmt = (
|
|
update(StudyQuizSessionJob)
|
|
.where(
|
|
StudyQuizSessionJob.status == "processing",
|
|
StudyQuizSessionJob.started_at.is_not(None),
|
|
StudyQuizSessionJob.started_at < cutoff,
|
|
)
|
|
.values(status="pending", started_at=None)
|
|
)
|
|
result = await session.execute(stmt)
|
|
await session.commit()
|
|
n = result.rowcount or 0
|
|
if n > 0:
|
|
logger.warning("study_session_jobs_stale_reset count=%s", n)
|
|
except SQLAlchemyError as e:
|
|
logger.exception("study_session_jobs_stale_reset_failed: %s", e)
|
|
|
|
|
|
async def consume_study_session_queue() -> None:
|
|
"""APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리."""
|
|
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
|
|
if "study_session_analysis" in settings.pipeline_held_stages:
|
|
logger.debug("study_session_analysis 보류 (pipeline.held_stages)")
|
|
return
|
|
await reset_stale_session_jobs()
|
|
|
|
async with async_session() as session:
|
|
rows = (
|
|
await session.execute(
|
|
select(StudyQuizSessionJob)
|
|
.where(StudyQuizSessionJob.status == "pending")
|
|
.order_by(StudyQuizSessionJob.id.asc())
|
|
.limit(BATCH_SIZE)
|
|
)
|
|
).scalars().all()
|
|
|
|
for job_row in rows:
|
|
async with async_session() as s:
|
|
try:
|
|
job = await s.get(StudyQuizSessionJob, job_row.id)
|
|
if job is None or job.status != "pending":
|
|
continue
|
|
await run_session_analysis_job(s, job)
|
|
await s.commit()
|
|
logger.info(
|
|
"session_analysis_processed id=%s sid=%s status=%s error_code=%s attempts=%s",
|
|
job.id, job.study_quiz_session_id, job.status, job.error_code,
|
|
job.attempts,
|
|
)
|
|
except Exception as e:
|
|
await s.rollback()
|
|
logger.exception("session_analysis_outer_failed job_id=%s: %s", job_row.id, e)
|