diff --git a/app/api/digest.py b/app/api/digest.py index 9333dcc..4eee567 100644 --- a/app/api/digest.py +++ b/app/api/digest.py @@ -244,7 +244,15 @@ async def regenerate( user: Annotated[User, Depends(require_admin)], ): """수동 트리거 — 백그라운드 태스크로 워커 실행 (admin 필요).""" + from core.config import settings from workers.digest_worker import run + # 홀드 중 silent no-op 방지 — 워커 게이트와 동일 조건을 표면에서 명시. + if "digest" in settings.pipeline_held_stages: + raise HTTPException( + status_code=409, + detail="global_digest 보류 중 (config.yaml pipeline.held_stages) — 해제 후 재시도", + ) + asyncio.create_task(run()) return {"status": "started", "message": "global_digest 워커 백그라운드 실행 시작"} diff --git a/app/core/config.py b/app/core/config.py index 7dade13..1bbd60e 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -158,6 +158,13 @@ class Settings(BaseModel): # 업로드 한도 (authoritative policy) upload: UploadConfig = UploadConfig() + # 생성 LLM 홀드 (2026-06-11): config.yaml pipeline.held_stages 에 든 이름의 + # 컨슈머/워커는 claim 자체를 하지 않는다 (attempts 미소모, pending 적체 = 의도). + # 유효 키 = 큐 stage 명(classify/summarize/deep_summary) + cron/컨슈머 키(digest, + # briefing, study_explanation, study_session_analysis, study_memo_card). + # 빈 리스트 = 무동작 (기존 동작 그대로). + pipeline_held_stages: list[str] = [] + # PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini # GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거. study_explanation_enabled: bool = True @@ -244,6 +251,14 @@ def load_settings() -> Settings: ) ) + pipeline_held_stages: list[str] = [] + if config_path.exists() and raw and "pipeline" in raw: + held_raw = (raw.get("pipeline") or {}).get("held_stages") or [] + # 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용. + if not isinstance(held_raw, (list, tuple)): + held_raw = [held_raw] + pipeline_held_stages = [str(s) for s in held_raw] + taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {} document_types = raw.get("document_types", []) if config_path.exists() and raw else [] upload_cfg = ( @@ -272,6 +287,7 @@ def load_settings() -> Settings: study_explanation_enabled=study_explanation_enabled, study_card_extract_enabled=study_card_extract_enabled, internal_worker_token=internal_worker_token, + pipeline_held_stages=pipeline_held_stages, ) diff --git a/app/workers/briefing_worker.py b/app/workers/briefing_worker.py index 9ce62fc..679f26f 100644 --- a/app/workers/briefing_worker.py +++ b/app/workers/briefing_worker.py @@ -8,6 +8,7 @@ import asyncio from datetime import date +from core.config import settings from core.utils import setup_logger from services.briefing.pipeline import run_briefing_pipeline @@ -22,6 +23,9 @@ async def run(target_date: date | None = None) -> dict | None: Args: target_date: KST 기준 briefing_date (None = 오늘). API regenerate 가 명시 지정 가능. """ + if "briefing" in settings.pipeline_held_stages: + logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip") + return None try: result = await asyncio.wait_for( run_briefing_pipeline(target_date), diff --git a/app/workers/digest_worker.py b/app/workers/digest_worker.py index d3ebf45..477a954 100644 --- a/app/workers/digest_worker.py +++ b/app/workers/digest_worker.py @@ -10,6 +10,7 @@ global_digests / digest_topics 테이블에 저장한다. import asyncio +from core.config import settings from core.utils import setup_logger from services.digest.pipeline import run_digest_pipeline @@ -24,6 +25,9 @@ async def run() -> None: pipeline 자체는 timeout 으로 감싸지 않음 (per-call timeout 은 summarizer 가 처리). 여기서는 전체 hard cap 만 강제. """ + if "digest" in settings.pipeline_held_stages: + logger.info("[global_digest] 보류 (pipeline.held_stages) — 이번 실행 skip") + return try: result = await asyncio.wait_for( run_digest_pipeline(), diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index c7ecbec..9008d43 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -13,12 +13,16 @@ from sqlalchemy import select, update, delete, exists from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.orm import aliased +from core.config import settings from core.database import async_session from core.utils import setup_logger from models.queue import ProcessingQueue, StageDeferred, enqueue_stage, not_deferred_condition logger = setup_logger("queue_consumer") +# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만. +_hold_logged = False + # stage별 배치 크기 # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. @@ -335,14 +339,22 @@ async def _process_stage(stage, worker_fn): async def consume_queue(): """메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리.""" + global _hold_logged workers = _load_workers() + held = [s for s in MAIN_QUEUE_STAGES if s in settings.pipeline_held_stages] + if held and not _hold_logged: + logger.info(f"pipeline.held_stages 보류 중: {held} — claim 하지 않음 (pending 적체 = 의도)") + _hold_logged = True + try: await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES) except Exception: logger.exception("stale reset failed, but continuing queue consumption") for stage in MAIN_QUEUE_STAGES: + if stage in settings.pipeline_held_stages: + continue await _process_stage(stage, workers[stage]) diff --git a/app/workers/study_memo_card_jobs_consumer.py b/app/workers/study_memo_card_jobs_consumer.py index 9386bd7..bcae95b 100644 --- a/app/workers/study_memo_card_jobs_consumer.py +++ b/app/workers/study_memo_card_jobs_consumer.py @@ -14,6 +14,7 @@ from datetime import datetime, timedelta, timezone from sqlalchemy import select, update from sqlalchemy.exc import SQLAlchemyError +from core.config import settings from core.database import async_session from core.utils import setup_logger from models.study_memo_card_job import StudyMemoCardJob @@ -50,6 +51,10 @@ async def reset_stale_card_jobs() -> None: async def consume_study_memo_card_queue() -> None: """APScheduler 진입점. pending card_extract job 을 BATCH_SIZE 만큼 처리.""" + # 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug). + if "study_memo_card" in settings.pipeline_held_stages: + logger.debug("study_memo_card 보류 (pipeline.held_stages)") + return await reset_stale_card_jobs() async with async_session() as session: diff --git a/app/workers/study_queue_consumer.py b/app/workers/study_queue_consumer.py index dfcef93..1d242b4 100644 --- a/app/workers/study_queue_consumer.py +++ b/app/workers/study_queue_consumer.py @@ -59,6 +59,11 @@ async def reset_stale_study_jobs() -> None: async def consume_study_queue() -> None: """APScheduler 진입점. pending job BATCH_SIZE 만큼 처리.""" + # 생성 LLM 홀드: env(study_explanation_enabled) 와 별개의 self-contained 게이트. + # pending 은 그대로 유지 (Mac mini derived-worker 흡수 경로도 본 게이트와 무관). + if "study_explanation" in settings.pipeline_held_stages: + logger.debug("study_explanation 보류 (pipeline.held_stages)") + return await reset_stale_study_jobs() async with async_session() as session: diff --git a/app/workers/study_session_queue_consumer.py b/app/workers/study_session_queue_consumer.py index 4ab9a90..4709d14 100644 --- a/app/workers/study_session_queue_consumer.py +++ b/app/workers/study_session_queue_consumer.py @@ -12,6 +12,7 @@ from datetime import datetime, timedelta, timezone from sqlalchemy import select, update from sqlalchemy.exc import SQLAlchemyError +from core.config import settings from core.database import async_session from core.utils import setup_logger from models.study_quiz_session_job import StudyQuizSessionJob @@ -48,6 +49,10 @@ async def reset_stale_session_jobs() -> None: async def consume_study_session_queue() -> None: """APScheduler 진입점. pending session_jobs 를 BATCH_SIZE 만큼 처리.""" + # 생성 LLM 홀드: claim 자체를 하지 않음 (1분 주기라 로그는 debug). + if "study_session_analysis" in settings.pipeline_held_stages: + logger.debug("study_session_analysis 보류 (pipeline.held_stages)") + return await reset_stale_session_jobs() async with async_session() as session: diff --git a/config.yaml b/config.yaml index 716a555..20d59d2 100644 --- a/config.yaml +++ b/config.yaml @@ -176,3 +176,14 @@ schedule: daily_digest: "20:00" file_watcher_interval_minutes: 5 queue_consumer_interval_minutes: 10 + +# 생성 LLM 홀드 (2026-06-11, 사용자 지시): 맥미니 모델 확정까지 생성 LLM 소비 스테이지 보류. +# - 큐: classify(triage)/summarize/deep_summary — claim 자체를 안 함 (attempts 미소모, pending 적체 = 의도) +# - cron/컨슈머: digest(global 04:00), briefing(05:10), study_explanation/study_session_analysis/ +# study_memo_card (1분 컨슈머) +# - 무영향: extract/embed/chunk/markdown/stt/preview/thumbnail/fulltext, 수집기 전부, +# 인터랙티브(ask/eid chat), daily_digest(LLM 미사용) +# 유효 키 = 위 8개 — 그 외 문자열은 무동작(오타 주의). 해제 = held_stages: [] 후 fastapi 재기동. +pipeline: + held_stages: ["classify", "summarize", "deep_summary", "digest", "briefing", + "study_explanation", "study_session_analysis", "study_memo_card"] diff --git a/tests/test_pipeline_hold.py b/tests/test_pipeline_hold.py new file mode 100644 index 0000000..b3a8b1f --- /dev/null +++ b/tests/test_pipeline_hold.py @@ -0,0 +1,168 @@ +"""생성 LLM 홀드 (pipeline.held_stages) — 컨슈머/워커 게이트 동작 테스트. + +홀드 시멘틱: held 스테이지는 claim 자체를 하지 않는다 (attempts 미소모, DB 무접촉). +비-held 스테이지는 기존과 동일하게 처리된다. +""" + +import pytest + +from core.config import Settings, settings +from workers import digest_worker, queue_consumer + + +def _fake_consumer_env(monkeypatch, held): + processed = [] + + async def fake_process(stage, worker): + processed.append(stage) + + async def fake_reset(stages, threshold): + return None + + monkeypatch.setattr(queue_consumer, "_process_stage", fake_process) + monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset) + monkeypatch.setattr( + queue_consumer, "_load_workers", + lambda: {s: object() for s in queue_consumer.MAIN_QUEUE_STAGES + ["markdown"]}, + ) + monkeypatch.setattr(queue_consumer, "_hold_logged", False) + monkeypatch.setattr(settings, "pipeline_held_stages", held) + return processed + + +def test_settings_default_empty(): + """미설정 시 빈 리스트 = 무동작 (기존 동작 무회귀).""" + assert Settings().pipeline_held_stages == [] + + +@pytest.mark.asyncio +async def test_consume_queue_skips_held_stages(monkeypatch): + processed = _fake_consumer_env( + monkeypatch, ["classify", "summarize", "deep_summary"] + ) + + await queue_consumer.consume_queue() + + assert "classify" not in processed + assert "summarize" not in processed + assert "deep_summary" not in processed + # GPU/특화 스테이지는 계속 처리 + for stage in ("extract", "embed", "chunk", "stt", "fulltext"): + assert stage in processed + + +@pytest.mark.asyncio +async def test_consume_queue_empty_hold_processes_all(monkeypatch): + processed = _fake_consumer_env(monkeypatch, []) + + await queue_consumer.consume_queue() + + assert processed == list(queue_consumer.MAIN_QUEUE_STAGES) + + +@pytest.mark.asyncio +async def test_markdown_consumer_not_held(monkeypatch): + """markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환).""" + processed = _fake_consumer_env( + monkeypatch, ["classify", "summarize", "deep_summary", "digest"] + ) + + await queue_consumer.consume_markdown_queue() + + assert processed == ["markdown"] + + +@pytest.mark.asyncio +async def test_digest_worker_held_returns_before_pipeline(monkeypatch): + called = {"pipeline": False} + + async def fake_pipeline(): + called["pipeline"] = True + return {} + + monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline) + monkeypatch.setattr(settings, "pipeline_held_stages", ["digest"]) + + await digest_worker.run() + + assert called["pipeline"] is False + + +@pytest.mark.asyncio +async def test_digest_worker_unheld_runs_pipeline(monkeypatch): + called = {"pipeline": False} + + async def fake_pipeline(): + called["pipeline"] = True + return {"clusters": 0} + + monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline) + monkeypatch.setattr(settings, "pipeline_held_stages", []) + + await digest_worker.run() + + assert called["pipeline"] is True + + +@pytest.mark.asyncio +async def test_briefing_worker_held_returns_before_pipeline(monkeypatch): + from workers import briefing_worker + + called = {"pipeline": False} + + async def fake_pipeline(target_date): + called["pipeline"] = True + return {} + + monkeypatch.setattr(briefing_worker, "run_briefing_pipeline", fake_pipeline) + monkeypatch.setattr(settings, "pipeline_held_stages", ["briefing"]) + + assert await briefing_worker.run() is None + assert called["pipeline"] is False + + +@pytest.mark.asyncio +async def test_study_explanation_consumer_held(monkeypatch): + from workers import study_queue_consumer + + touched = [] + + async def fake_reset(): + touched.append("reset") + + monkeypatch.setattr(study_queue_consumer, "reset_stale_study_jobs", fake_reset) + monkeypatch.setattr(settings, "pipeline_held_stages", ["study_explanation"]) + + await study_queue_consumer.consume_study_queue() + + assert touched == [] + + +@pytest.mark.asyncio +async def test_study_consumers_held_no_db_touch(monkeypatch): + """held 시 stale reset 포함 DB 접근 0 — claim 미발생 실증.""" + from workers import study_memo_card_jobs_consumer, study_session_queue_consumer + + touched = [] + + async def fake_reset_session(): + touched.append("session_reset") + + async def fake_reset_card(): + touched.append("card_reset") + + monkeypatch.setattr( + study_session_queue_consumer, "reset_stale_session_jobs", fake_reset_session + ) + monkeypatch.setattr( + study_memo_card_jobs_consumer, "reset_stale_card_jobs", fake_reset_card + ) + monkeypatch.setattr( + settings, "pipeline_held_stages", + ["study_session_analysis", "study_memo_card"], + ) + + await study_session_queue_consumer.consume_study_session_queue() + await study_memo_card_jobs_consumer.consume_study_memo_card_queue() + + assert touched == []