Files
hyungi_document_server/app/workers/study_queue_consumer.py
T
hyungi cd0040925a ops(pipeline): 생성 LLM 홀드 게이트 held_stages — 맥미니 모델 확정까지 보류
맥북 LLM 백지화 + 맥미니 모델 재결정에 따라 DS 의 생성 LLM 소비를 일괄 보류.
held = classify/summarize/deep_summary(큐, claim 미발생·attempts 미소모) +
digest(04:00)/briefing(05:10) cron + study explanation/session_analysis/memo_card 컨슈머.
GPU 특화 스테이지·수집기·인터랙티브(ask/eid chat)는 무영향. 기본값 [] = 무동작.
/api/digest/regenerate 는 홀드 중 409 명시. 해제 = config held_stages 비우고 fastapi 재기동.
exec plan: ~/.claude/plans/ds-llm-hold-exec-20260611.md

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 16:52:46 +09:00

120 lines
5.5 KiB
Python

"""Phase 4-A study_question_jobs consumer — APScheduler 1분 간격.
study 도메인 전용 큐 (processing_queue 와 분리). BATCH_SIZE=1 — MLX 26B 가
deep_summary 와 같은 Semaphore(1) 게이트를 공유하므로 GPU 부하 통제.
stage 별 worker dispatch:
- kind='explanation': study_explanation_worker.run_explanation_job (Phase 4-A)
- kind='session_summary': Phase 4-B 예약, 1차 미구현 — pending 만 보면 즉시 skipped 처리
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.database import async_session
from core.config import settings
from core.utils import setup_logger
from models.study_question_job import StudyQuestionJob
from workers.study_explanation_worker import run_explanation_job
logger = setup_logger("study_queue_consumer")
# 한 사이클에 한 row 씩 — MLX 게이트 직렬화 + GPU 부하 예측 가능.
BATCH_SIZE = 1
# processing 상태로 STALE_MINUTES 이상 방치된 job 은 worker 가 죽었다고 보고 pending 복구.
STALE_MINUTES = 10
async def reset_stale_study_jobs() -> None:
"""processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구.
partial unique idx 가 (qid, kind) WHERE status IN ('pending','processing') 이라
같은 qid 의 다른 pending 이 동시에 있을 일이 거의 없음 (enqueue 가 on_conflict_do_nothing).
그래도 안전하게 update 만 — 충돌 시 IntegrityError 는 단건 단위 catch.
"""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES)
try:
async with async_session() as session:
stmt = (
update(StudyQuestionJob)
.where(
StudyQuestionJob.status == "processing",
StudyQuestionJob.started_at.is_not(None),
StudyQuestionJob.started_at < cutoff,
)
.values(status="pending", started_at=None)
)
result = await session.execute(stmt)
await session.commit()
n = result.rowcount or 0
if n > 0:
logger.warning("study_jobs_stale_reset count=%s", n)
except SQLAlchemyError as e:
logger.exception("study_jobs_stale_reset_failed: %s", e)
async def consume_study_queue() -> None:
"""APScheduler 진입점. pending job BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: env(study_explanation_enabled) 와 별개의 self-contained 게이트.
# pending 은 그대로 유지 (Mac mini derived-worker 흡수 경로도 본 게이트와 무관).
if "study_explanation" in settings.pipeline_held_stages:
logger.debug("study_explanation 보류 (pipeline.held_stages)")
return
await reset_stale_study_jobs()
async with async_session() as session:
rows = (
await session.execute(
select(StudyQuestionJob)
.where(StudyQuestionJob.status == "pending")
.order_by(StudyQuestionJob.id.asc())
.limit(BATCH_SIZE)
)
).scalars().all()
for job_row in rows:
# 각 job 마다 독립 세션 — 한 job 실패가 다른 job 에 전염 X.
async with async_session() as s:
try:
# 같은 row 를 fresh load (lock 회피, 세션 격리)
job = await s.get(StudyQuestionJob, job_row.id)
if job is None or job.status != "pending":
continue # 다른 cycle 에서 이미 처리
if job.kind == "explanation":
if not settings.study_explanation_enabled:
# PR-MacMini-Derived-Worker-1: explanation owner = Mac mini.
# status/attempts 변경하지 않고 pending 그대로 유지 → Mac mini worker 가 흡수.
logger.info("skip explanation owner=macmini job_id=%s qid=%s", job.id, job.study_question_id)
continue
await run_explanation_job(s, job)
elif job.kind == "session_summary":
# Phase 4-B 미구현 — 즉시 skipped 처리 (lost in queue 방지)
job.status = "skipped"
job.error_code = "unknown"
job.error_message = "session_summary not implemented in Phase 4-A"
job.completed_at = datetime.now(timezone.utc)
logger.info("study_job_unsupported_kind id=%s kind=%s", job.id, job.kind)
else:
job.status = "skipped"
job.error_code = "unknown"
job.error_message = f"unknown kind: {job.kind!r}"
job.completed_at = datetime.now(timezone.utc)
logger.warning("study_job_unknown_kind id=%s kind=%s", job.id, job.kind)
await s.commit()
logger.info(
"study_job_processed id=%s qid=%s kind=%s status=%s error_code=%s attempts=%s",
job.id, job.study_question_id, job.kind, job.status, job.error_code,
job.attempts,
)
except Exception as e:
# worker 안에서 잡지 못한 catastrophic 실패 — 세션 rollback 후 다음 cycle 재처리.
# study_explanation_worker 가 catch-all except 를 가지므로 여기 도달은 드묾.
await s.rollback()
logger.exception("study_job_outer_failed job_id=%s: %s", job_row.id, e)