Files
hyungi_document_server/app/workers/study_session_queue_consumer.py
hyungi cd0040925a ops(pipeline): 생성 LLM 홀드 게이트 held_stages — 맥미니 모델 확정까지 보류
맥북 LLM 백지화 + 맥미니 모델 재결정에 따라 DS 의 생성 LLM 소비를 일괄 보류.
held = classify/summarize/deep_summary(큐, claim 미발생·attempts 미소모) +
digest(04:00)/briefing(05:10) cron + study explanation/session_analysis/memo_card 컨슈머.
GPU 특화 스테이지·수집기·인터랙티브(ask/eid chat)는 무영향. 기본값 [] = 무동작.
/api/digest/regenerate 는 홀드 중 409 명시. 해제 = config held_stages 비우고 fastapi 재기동.
exec plan: ~/.claude/plans/ds-llm-hold-exec-20260611.md

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 16:52:46 +09:00

84 lines
3.2 KiB
Python

"""Phase 4-B v1 study_quiz_session_jobs consumer — APScheduler 1분 간격.
세션 단위 전용 — study_question_jobs (4-A) 와 분리. 운영 SQL 명확성.
BATCH_SIZE=1, MLX gate Semaphore(1) 4-A 와 공유 — 4-A 가 처리 중이면 직렬 대기.
STALE_MINUTES=10 자체 복구.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.study_quiz_session_job import StudyQuizSessionJob
from workers.study_session_analysis_worker import run_session_analysis_job
logger = setup_logger("study_session_queue_consumer")
BATCH_SIZE = 1
STALE_MINUTES = 10
async def reset_stale_session_jobs() -> None:
"""processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구."""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES)
try:
async with async_session() as session:
stmt = (
update(StudyQuizSessionJob)
.where(
StudyQuizSessionJob.status == "processing",
StudyQuizSessionJob.started_at.is_not(None),
StudyQuizSessionJob.started_at < cutoff,
)
.values(status="pending", started_at=None)
)
result = await session.execute(stmt)
await session.commit()
n = result.rowcount or 0
if n > 0:
logger.warning("study_session_jobs_stale_reset count=%s", n)
except SQLAlchemyError as e:
logger.exception("study_session_jobs_stale_reset_failed: %s", e)
async def consume_study_session_queue() -> None:
"""APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
if "study_session_analysis" in settings.pipeline_held_stages:
logger.debug("study_session_analysis 보류 (pipeline.held_stages)")
return
await reset_stale_session_jobs()
async with async_session() as session:
rows = (
await session.execute(
select(StudyQuizSessionJob)
.where(StudyQuizSessionJob.status == "pending")
.order_by(StudyQuizSessionJob.id.asc())
.limit(BATCH_SIZE)
)
).scalars().all()
for job_row in rows:
async with async_session() as s:
try:
job = await s.get(StudyQuizSessionJob, job_row.id)
if job is None or job.status != "pending":
continue
await run_session_analysis_job(s, job)
await s.commit()
logger.info(
"session_analysis_processed id=%s sid=%s status=%s error_code=%s attempts=%s",
job.id, job.study_quiz_session_id, job.status, job.error_code,
job.attempts,
)
except Exception as e:
await s.rollback()
logger.exception("session_analysis_outer_failed job_id=%s: %s", job_row.id, e)