a82b0724df
2026-06-11 맥미니 모델 교체(Gemma4 26B→Qwen3.6-27B-6bit, 콜당 ~90~300s)의 타임아웃 상향 sweep 이 config.yaml/synthesis 만 갱신하고 digest/briefing 코드의 하드코딩 LLM_CALL_TIMEOUT=25(빠른 Gemma 기준)를 누락 → digest 600s 하드캡 초과로 06-10 이후 미생성, briefing 4/4 LLM 폴백(status=failed). (적대 리뷰로 블로커 정정: concurrency=1 사설 세마포로는 digest 44~68 클러스터가 하드캡에 여전히 걸림 + llm_gate 영구 룰 위반.) - 타임아웃·재시도·하드캡을 config.pipeline 단일소스로 이관(digest_llm_timeout_s=300, attempts=2, pipeline_hard_cap_s=3000). 다음 모델 교체 때 재발 차단. - digest/briefing LLM 호출을 사설 Semaphore 제거하고 전역 MLX gate(BACKGROUND) 경유로 변경 — llm_gate 영구 룰(같은 endpoint 단일 게이트, 새 Semaphore 금지) 준수 + ask/eid(FOREGROUND)와 조율. 동시성 lever = 기존 mlx_gate_concurrency 2→4 (continuous batching 실측 — 3동시콜 wall 121s ≈ 단일콜, 직렬 대비 ~3배). - digest/briefing pipeline cluster 루프를 asyncio.gather 동시 실행으로 전환 (실동시성은 게이트가 제한, rank/순서 보존). - deep_summary(70~300s)를 메인 consume_queue 에서 분리해 consume_deep_queue 신설 (markdown/fast split 선례) — 단일 deep 호출이 1분 틱 초과로 메인 큐를 영구 coalesce 시키던 문제 제거. - 죽은 PIPELINE_HARD_CAP=600(briefing/pipeline.py) 제거, summarizer docstring 갱신, deep 컨슈머 disjoint/hold 테스트 추가. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
228 lines
7.0 KiB
Python
228 lines
7.0 KiB
Python
"""생성 LLM 홀드 (pipeline.held_stages) — 컨슈머/워커 게이트 동작 테스트.
|
|
|
|
홀드 시멘틱: held 스테이지는 claim 자체를 하지 않는다 (attempts 미소모, DB 무접촉).
|
|
비-held 스테이지는 기존과 동일하게 처리된다.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from core.config import Settings, settings
|
|
from workers import digest_worker, queue_consumer
|
|
|
|
|
|
def _fake_consumer_env(monkeypatch, held):
|
|
processed = []
|
|
|
|
async def fake_process(stage, worker):
|
|
processed.append(stage)
|
|
|
|
async def fake_reset(stages, threshold):
|
|
return None
|
|
|
|
monkeypatch.setattr(queue_consumer, "_process_stage", fake_process)
|
|
monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset)
|
|
monkeypatch.setattr(
|
|
queue_consumer, "_load_workers",
|
|
lambda: {
|
|
s: object()
|
|
for s in (queue_consumer.MAIN_QUEUE_STAGES
|
|
+ queue_consumer.FAST_QUEUE_STAGES
|
|
+ queue_consumer.DEEP_QUEUE_STAGES + ["markdown"])
|
|
},
|
|
)
|
|
monkeypatch.setattr(queue_consumer, "_hold_logged", False)
|
|
monkeypatch.setattr(settings, "pipeline_held_stages", held)
|
|
return processed
|
|
|
|
|
|
def test_settings_default_empty():
|
|
"""미설정 시 빈 리스트 = 무동작 (기존 동작 무회귀)."""
|
|
assert Settings().pipeline_held_stages == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_consume_queue_skips_held_stages(monkeypatch):
|
|
processed = _fake_consumer_env(
|
|
monkeypatch, ["classify", "summarize", "deep_summary"]
|
|
)
|
|
|
|
await queue_consumer.consume_queue()
|
|
|
|
assert "classify" not in processed
|
|
assert "summarize" not in processed
|
|
assert "deep_summary" not in processed
|
|
# 특화 스테이지는 계속 처리 (embed/chunk 는 2026-06-12 fast 컨슈머로 분리)
|
|
for stage in ("extract", "stt", "fulltext"):
|
|
assert stage in processed
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_consume_queue_empty_hold_processes_all(monkeypatch):
|
|
processed = _fake_consumer_env(monkeypatch, [])
|
|
|
|
await queue_consumer.consume_queue()
|
|
|
|
assert processed == list(queue_consumer.MAIN_QUEUE_STAGES)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fast_consumer_processes_embed_chunk_only(monkeypatch):
|
|
"""fast 컨슈머(2026-06-12 분리) = embed/chunk 전용, LLM 사이클과 디커플."""
|
|
processed = _fake_consumer_env(monkeypatch, [])
|
|
|
|
await queue_consumer.consume_fast_queue()
|
|
|
|
assert processed == ["embed", "chunk"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fast_consumer_respects_hold(monkeypatch):
|
|
processed = _fake_consumer_env(monkeypatch, ["embed"])
|
|
|
|
await queue_consumer.consume_fast_queue()
|
|
|
|
assert processed == ["chunk"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deep_consumer_processes_deep_only(monkeypatch):
|
|
"""deep 컨슈머(2026-06-15 분리) = deep_summary 전용 (메인 루프와 디커플)."""
|
|
processed = _fake_consumer_env(monkeypatch, [])
|
|
|
|
await queue_consumer.consume_deep_queue()
|
|
|
|
assert processed == ["deep_summary"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deep_consumer_respects_hold(monkeypatch):
|
|
"""deep_summary 홀드 시 deep 컨슈머가 claim 안 함."""
|
|
processed = _fake_consumer_env(monkeypatch, ["deep_summary"])
|
|
|
|
await queue_consumer.consume_deep_queue()
|
|
|
|
assert processed == []
|
|
|
|
|
|
def test_fast_split_invariants():
|
|
"""네 컨슈머 stage 집합 disjoint + embed/chunk 배치 상향 + deep split 회귀 가드."""
|
|
main = set(queue_consumer.MAIN_QUEUE_STAGES)
|
|
fast = set(queue_consumer.FAST_QUEUE_STAGES)
|
|
md = set(queue_consumer.MARKDOWN_QUEUE_STAGES)
|
|
deep = set(queue_consumer.DEEP_QUEUE_STAGES)
|
|
assert not (main & fast) and not (main & md) and not (fast & md)
|
|
assert not (main & deep) and not (fast & deep) and not (md & deep)
|
|
assert fast == {"embed", "chunk"}
|
|
assert deep == {"deep_summary"}
|
|
assert "deep_summary" not in main # 2026-06-15 split 회귀 가드
|
|
assert queue_consumer.BATCH_SIZE["embed"] >= 10
|
|
assert queue_consumer.BATCH_SIZE["chunk"] >= 10
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_markdown_consumer_not_held(monkeypatch):
|
|
"""markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""
|
|
processed = _fake_consumer_env(
|
|
monkeypatch, ["classify", "summarize", "deep_summary", "digest"]
|
|
)
|
|
|
|
await queue_consumer.consume_markdown_queue()
|
|
|
|
assert processed == ["markdown"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_digest_worker_held_returns_before_pipeline(monkeypatch):
|
|
called = {"pipeline": False}
|
|
|
|
async def fake_pipeline():
|
|
called["pipeline"] = True
|
|
return {}
|
|
|
|
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
|
|
monkeypatch.setattr(settings, "pipeline_held_stages", ["digest"])
|
|
|
|
await digest_worker.run()
|
|
|
|
assert called["pipeline"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_digest_worker_unheld_runs_pipeline(monkeypatch):
|
|
called = {"pipeline": False}
|
|
|
|
async def fake_pipeline():
|
|
called["pipeline"] = True
|
|
return {"clusters": 0}
|
|
|
|
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
|
|
monkeypatch.setattr(settings, "pipeline_held_stages", [])
|
|
|
|
await digest_worker.run()
|
|
|
|
assert called["pipeline"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_briefing_worker_held_returns_before_pipeline(monkeypatch):
|
|
from workers import briefing_worker
|
|
|
|
called = {"pipeline": False}
|
|
|
|
async def fake_pipeline(target_date):
|
|
called["pipeline"] = True
|
|
return {}
|
|
|
|
monkeypatch.setattr(briefing_worker, "run_briefing_pipeline", fake_pipeline)
|
|
monkeypatch.setattr(settings, "pipeline_held_stages", ["briefing"])
|
|
|
|
assert await briefing_worker.run() is None
|
|
assert called["pipeline"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_study_explanation_consumer_held(monkeypatch):
|
|
from workers import study_queue_consumer
|
|
|
|
touched = []
|
|
|
|
async def fake_reset():
|
|
touched.append("reset")
|
|
|
|
monkeypatch.setattr(study_queue_consumer, "reset_stale_study_jobs", fake_reset)
|
|
monkeypatch.setattr(settings, "pipeline_held_stages", ["study_explanation"])
|
|
|
|
await study_queue_consumer.consume_study_queue()
|
|
|
|
assert touched == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_study_consumers_held_no_db_touch(monkeypatch):
|
|
"""held 시 stale reset 포함 DB 접근 0 — claim 미발생 실증."""
|
|
from workers import study_memo_card_jobs_consumer, study_session_queue_consumer
|
|
|
|
touched = []
|
|
|
|
async def fake_reset_session():
|
|
touched.append("session_reset")
|
|
|
|
async def fake_reset_card():
|
|
touched.append("card_reset")
|
|
|
|
monkeypatch.setattr(
|
|
study_session_queue_consumer, "reset_stale_session_jobs", fake_reset_session
|
|
)
|
|
monkeypatch.setattr(
|
|
study_memo_card_jobs_consumer, "reset_stale_card_jobs", fake_reset_card
|
|
)
|
|
monkeypatch.setattr(
|
|
settings, "pipeline_held_stages",
|
|
["study_session_analysis", "study_memo_card"],
|
|
)
|
|
|
|
await study_session_queue_consumer.consume_study_session_queue()
|
|
await study_memo_card_jobs_consumer.consume_study_memo_card_queue()
|
|
|
|
assert touched == []
|