ops(pipeline): 생성 LLM 홀드 게이트 held_stages — 맥미니 모델 확정까지 보류

맥북 LLM 백지화 + 맥미니 모델 재결정에 따라 DS 의 생성 LLM 소비를 일괄 보류.
held = classify/summarize/deep_summary(큐, claim 미발생·attempts 미소모) +
digest(04:00)/briefing(05:10) cron + study explanation/session_analysis/memo_card 컨슈머.
GPU 특화 스테이지·수집기·인터랙티브(ask/eid chat)는 무영향. 기본값 [] = 무동작.
/api/digest/regenerate 는 홀드 중 409 명시. 해제 = config held_stages 비우고 fastapi 재기동.
exec plan: ~/.claude/plans/ds-llm-hold-exec-20260611.md

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-11 16:52:46 +09:00
parent fdac449a48
commit cd0040925a
10 changed files with 238 additions and 0 deletions
+8
View File
@@ -244,7 +244,15 @@ async def regenerate(
user: Annotated[User, Depends(require_admin)],
):
"""수동 트리거 — 백그라운드 태스크로 워커 실행 (admin 필요)."""
from core.config import settings
from workers.digest_worker import run
# 홀드 중 silent no-op 방지 — 워커 게이트와 동일 조건을 표면에서 명시.
if "digest" in settings.pipeline_held_stages:
raise HTTPException(
status_code=409,
detail="global_digest 보류 중 (config.yaml pipeline.held_stages) — 해제 후 재시도",
)
asyncio.create_task(run())
return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"}
+16
View File
@@ -158,6 +158,13 @@ class Settings(BaseModel):
# 업로드 한도 (authoritative policy)
upload: UploadConfig = UploadConfig()
# 생성 LLM 홀드 (2026-06-11): config.yaml pipeline.held_stages 에 든 이름의
# 컨슈머/워커는 claim 자체를 하지 않는다 (attempts 미소모, pending 적체 = 의도).
# 유효 키 = 큐 stage 명(classify/summarize/deep_summary) + cron/컨슈머 키(digest,
# briefing, study_explanation, study_session_analysis, study_memo_card).
# 빈 리스트 = 무동작 (기존 동작 그대로).
pipeline_held_stages: list[str] = []
# PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini
# GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거.
study_explanation_enabled: bool = True
@@ -244,6 +251,14 @@ def load_settings() -> Settings:
)
)
pipeline_held_stages: list[str] = []
if config_path.exists() and raw and "pipeline" in raw:
held_raw = (raw.get("pipeline") or {}).get("held_stages") or []
# 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용.
if not isinstance(held_raw, (list, tuple)):
held_raw = [held_raw]
pipeline_held_stages = [str(s) for s in held_raw]
taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {}
document_types = raw.get("document_types", []) if config_path.exists() and raw else []
upload_cfg = (
@@ -272,6 +287,7 @@ def load_settings() -> Settings:
study_explanation_enabled=study_explanation_enabled,
study_card_extract_enabled=study_card_extract_enabled,
internal_worker_token=internal_worker_token,
pipeline_held_stages=pipeline_held_stages,
)
+4
View File
@@ -8,6 +8,7 @@
import asyncio
from datetime import date
from core.config import settings
from core.utils import setup_logger
from services.briefing.pipeline import run_briefing_pipeline
@@ -22,6 +23,9 @@ async def run(target_date: date | None = None) -> dict | None:
Args:
target_date: KST 기준 briefing_date (None = 오늘). API regenerate 가 명시 지정 가능.
"""
if "briefing" in settings.pipeline_held_stages:
logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip")
return None
try:
result = await asyncio.wait_for(
run_briefing_pipeline(target_date),
+4
View File
@@ -10,6 +10,7 @@ global_digests / digest_topics 테이블에 저장한다.
import asyncio
from core.config import settings
from core.utils import setup_logger
from services.digest.pipeline import run_digest_pipeline
@@ -24,6 +25,9 @@ async def run() -> None:
pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout 은 summarizer 가 처리).
여기서는 전체 hard cap 만 강제.
"""
if "digest" in settings.pipeline_held_stages:
logger.info("[global_digest] 보류 (pipeline.held_stages) — 이번 실행 skip")
return
try:
result = await asyncio.wait_for(
run_digest_pipeline(),
+12
View File
@@ -13,12 +13,16 @@ from sqlalchemy import select, update, delete, exists
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import aliased
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition
logger = setup_logger("queue_consumer")
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
_hold_logged = False
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
@@ -335,14 +339,22 @@ async def _process_stage(stage, worker_fn):
async def consume_queue():
"""메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리."""
global _hold_logged
workers = _load_workers()
held = [s for s in MAIN_QUEUE_STAGES if s in settings.pipeline_held_stages]
if held and not _hold_logged:
logger.info(f"pipeline.held_stages 보류 중: {held} — claim 하지 않음 (pending 적체 = 의도)")
_hold_logged = True
try:
await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("stale reset failed, but continuing queue consumption")
for stage in MAIN_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
@@ -14,6 +14,7 @@ from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.study_memo_card_job import StudyMemoCardJob
@@ -50,6 +51,10 @@ async def reset_stale_card_jobs() -> None:
async def consume_study_memo_card_queue() -> None:
"""APScheduler 진입점. pending card_extract job 을 BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
if "study_memo_card" in settings.pipeline_held_stages:
logger.debug("study_memo_card 보류 (pipeline.held_stages)")
return
await reset_stale_card_jobs()
async with async_session() as session:
+5
View File
@@ -59,6 +59,11 @@ async def reset_stale_study_jobs() -> None:
async def consume_study_queue() -> None:
"""APScheduler 진입점. pending job BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: env(study_explanation_enabled) 와 별개의 self-contained 게이트.
# pending 은 그대로 유지 (Mac mini derived-worker 흡수 경로도 본 게이트와 무관).
if "study_explanation" in settings.pipeline_held_stages:
logger.debug("study_explanation 보류 (pipeline.held_stages)")
return
await reset_stale_study_jobs()
async with async_session() as session:
@@ -12,6 +12,7 @@ from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.study_quiz_session_job import StudyQuizSessionJob
@@ -48,6 +49,10 @@ async def reset_stale_session_jobs() -> None:
async def consume_study_session_queue() -> None:
"""APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리."""
# 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug).
if "study_session_analysis" in settings.pipeline_held_stages:
logger.debug("study_session_analysis 보류 (pipeline.held_stages)")
return
await reset_stale_session_jobs()
async with async_session() as session:
+11
View File
@@ -176,3 +176,14 @@ schedule:
daily_digest: "20:00"
file_watcher_interval_minutes: 5
queue_consumer_interval_minutes: 10
# 생성 LLM 홀드 (2026-06-11, 사용자 지시): 맥미니 모델 확정까지 생성 LLM 소비 스테이지 보류.
# - 큐: classify(triage)/summarize/deep_summary — claim 자체를 안 함 (attempts 미소모, pending 적체 = 의도)
# - cron/컨슈머: digest(global 04:00), briefing(05:10), study_explanation/study_session_analysis/
# study_memo_card (1분 컨슈머)
# - 무영향: extract/embed/chunk/markdown/stt/preview/thumbnail/fulltext, 수집기 전부,
# 인터랙티브(ask/eid chat), daily_digest(LLM 미사용)
# 유효 키 = 위 8개 — 그 외 문자열은 무동작(오타 주의). 해제 = held_stages: [] 후 fastapi 재기동.
pipeline:
held_stages: ["classify", "summarize", "deep_summary", "digest", "briefing",
"study_explanation", "study_session_analysis", "study_memo_card"]
+168
View File
@@ -0,0 +1,168 @@
"""생성 LLM 홀드 (pipeline.held_stages) — 컨슈머/워커 게이트 동작 테스트.
홀드 시멘틱: held 스테이지는 claim 자체를 하지 않는다 (attempts 미소모, DB 무접촉).
비-held 스테이지는 기존과 동일하게 처리된다.
"""
import pytest
from core.config import Settings, settings
from workers import digest_worker, queue_consumer
def _fake_consumer_env(monkeypatch, held):
processed = []
async def fake_process(stage, worker):
processed.append(stage)
async def fake_reset(stages, threshold):
return None
monkeypatch.setattr(queue_consumer, "_process_stage", fake_process)
monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset)
monkeypatch.setattr(
queue_consumer, "_load_workers",
lambda: {s: object() for s in queue_consumer.MAIN_QUEUE_STAGES + ["markdown"]},
)
monkeypatch.setattr(queue_consumer, "_hold_logged", False)
monkeypatch.setattr(settings, "pipeline_held_stages", held)
return processed
def test_settings_default_empty():
"""미설정 시 빈 리스트 = 무동작 (기존 동작 무회귀)."""
assert Settings().pipeline_held_stages == []
@pytest.mark.asyncio
async def test_consume_queue_skips_held_stages(monkeypatch):
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary"]
)
await queue_consumer.consume_queue()
assert "classify" not in processed
assert "summarize" not in processed
assert "deep_summary" not in processed
# GPU/특화 스테이지는 계속 처리
for stage in ("extract", "embed", "chunk", "stt", "fulltext"):
assert stage in processed
@pytest.mark.asyncio
async def test_consume_queue_empty_hold_processes_all(monkeypatch):
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_queue()
assert processed == list(queue_consumer.MAIN_QUEUE_STAGES)
@pytest.mark.asyncio
async def test_markdown_consumer_not_held(monkeypatch):
"""markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary", "digest"]
)
await queue_consumer.consume_markdown_queue()
assert processed == ["markdown"]
@pytest.mark.asyncio
async def test_digest_worker_held_returns_before_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["digest"])
await digest_worker.run()
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_digest_worker_unheld_runs_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {"clusters": 0}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", [])
await digest_worker.run()
assert called["pipeline"] is True
@pytest.mark.asyncio
async def test_briefing_worker_held_returns_before_pipeline(monkeypatch):
from workers import briefing_worker
called = {"pipeline": False}
async def fake_pipeline(target_date):
called["pipeline"] = True
return {}
monkeypatch.setattr(briefing_worker, "run_briefing_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["briefing"])
assert await briefing_worker.run() is None
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_study_explanation_consumer_held(monkeypatch):
from workers import study_queue_consumer
touched = []
async def fake_reset():
touched.append("reset")
monkeypatch.setattr(study_queue_consumer, "reset_stale_study_jobs", fake_reset)
monkeypatch.setattr(settings, "pipeline_held_stages", ["study_explanation"])
await study_queue_consumer.consume_study_queue()
assert touched == []
@pytest.mark.asyncio
async def test_study_consumers_held_no_db_touch(monkeypatch):
"""held 시 stale reset 포함 DB 접근 0 — claim 미발생 실증."""
from workers import study_memo_card_jobs_consumer, study_session_queue_consumer
touched = []
async def fake_reset_session():
touched.append("session_reset")
async def fake_reset_card():
touched.append("card_reset")
monkeypatch.setattr(
study_session_queue_consumer, "reset_stale_session_jobs", fake_reset_session
)
monkeypatch.setattr(
study_memo_card_jobs_consumer, "reset_stale_card_jobs", fake_reset_card
)
monkeypatch.setattr(
settings, "pipeline_held_stages",
["study_session_analysis", "study_memo_card"],
)
await study_session_queue_consumer.consume_study_session_queue()
await study_memo_card_jobs_consumer.consume_study_memo_card_queue()
assert touched == []