Files
hyungi_document_server/tests/test_pipeline_hold.py
T
hyungi 5dca5b5d28 ops(pipeline): embed/chunk 고속 컨슈머 분리 + 배치 1→10 — LLM 사이클 인질 해소
진단(2026-06-12 용량 평가): 단일 루프에서 classify(~190s×3)가 사이클을 점유,
건당 <1s 인 embed/chunk 가 사이클당 1건 캡 → 실효 ~580/일 vs 수요 최대 2,700/일,
적체 3,570 + 신규 문서 벡터 미적재(RAG 검색 누락). 4070 가동률 0% = 순수 구조 캡.
수리 = markdown 분리(05-01) 선례: consume_fast_queue 1분 잡 + 배치 10(GPU 공유 보수값,
캡 ~14,400/일). 세 컨슈머 stage 집합 disjoint(stale reset 이중 복구 방지). retrieval
로직·임베딩 모델 무접촉.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 07:50:07 +09:00

203 lines
6.1 KiB
Python

"""생성 LLM 홀드 (pipeline.held_stages) — 컨슈머/워커 게이트 동작 테스트.
홀드 시멘틱: held 스테이지는 claim 자체를 하지 않는다 (attempts 미소모, DB 무접촉).
비-held 스테이지는 기존과 동일하게 처리된다.
"""
import pytest
from core.config import Settings, settings
from workers import digest_worker, queue_consumer
def _fake_consumer_env(monkeypatch, held):
processed = []
async def fake_process(stage, worker):
processed.append(stage)
async def fake_reset(stages, threshold):
return None
monkeypatch.setattr(queue_consumer, "_process_stage", fake_process)
monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset)
monkeypatch.setattr(
queue_consumer, "_load_workers",
lambda: {
s: object()
for s in (queue_consumer.MAIN_QUEUE_STAGES
+ queue_consumer.FAST_QUEUE_STAGES + ["markdown"])
},
)
monkeypatch.setattr(queue_consumer, "_hold_logged", False)
monkeypatch.setattr(settings, "pipeline_held_stages", held)
return processed
def test_settings_default_empty():
"""미설정 시 빈 리스트 = 무동작 (기존 동작 무회귀)."""
assert Settings().pipeline_held_stages == []
@pytest.mark.asyncio
async def test_consume_queue_skips_held_stages(monkeypatch):
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary"]
)
await queue_consumer.consume_queue()
assert "classify" not in processed
assert "summarize" not in processed
assert "deep_summary" not in processed
# 특화 스테이지는 계속 처리 (embed/chunk 는 2026-06-12 fast 컨슈머로 분리)
for stage in ("extract", "stt", "fulltext"):
assert stage in processed
@pytest.mark.asyncio
async def test_consume_queue_empty_hold_processes_all(monkeypatch):
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_queue()
assert processed == list(queue_consumer.MAIN_QUEUE_STAGES)
@pytest.mark.asyncio
async def test_fast_consumer_processes_embed_chunk_only(monkeypatch):
"""fast 컨슈머(2026-06-12 분리) = embed/chunk 전용, LLM 사이클과 디커플."""
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_fast_queue()
assert processed == ["embed", "chunk"]
@pytest.mark.asyncio
async def test_fast_consumer_respects_hold(monkeypatch):
processed = _fake_consumer_env(monkeypatch, ["embed"])
await queue_consumer.consume_fast_queue()
assert processed == ["chunk"]
def test_fast_split_invariants():
"""세 컨슈머 stage 집합 disjoint + embed/chunk 배치 상향 회귀 가드."""
main = set(queue_consumer.MAIN_QUEUE_STAGES)
fast = set(queue_consumer.FAST_QUEUE_STAGES)
md = set(queue_consumer.MARKDOWN_QUEUE_STAGES)
assert not (main & fast) and not (main & md) and not (fast & md)
assert fast == {"embed", "chunk"}
assert queue_consumer.BATCH_SIZE["embed"] >= 10
assert queue_consumer.BATCH_SIZE["chunk"] >= 10
@pytest.mark.asyncio
async def test_markdown_consumer_not_held(monkeypatch):
"""markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""
processed = _fake_consumer_env(
monkeypatch, ["classify", "summarize", "deep_summary", "digest"]
)
await queue_consumer.consume_markdown_queue()
assert processed == ["markdown"]
@pytest.mark.asyncio
async def test_digest_worker_held_returns_before_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["digest"])
await digest_worker.run()
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_digest_worker_unheld_runs_pipeline(monkeypatch):
called = {"pipeline": False}
async def fake_pipeline():
called["pipeline"] = True
return {"clusters": 0}
monkeypatch.setattr(digest_worker, "run_digest_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", [])
await digest_worker.run()
assert called["pipeline"] is True
@pytest.mark.asyncio
async def test_briefing_worker_held_returns_before_pipeline(monkeypatch):
from workers import briefing_worker
called = {"pipeline": False}
async def fake_pipeline(target_date):
called["pipeline"] = True
return {}
monkeypatch.setattr(briefing_worker, "run_briefing_pipeline", fake_pipeline)
monkeypatch.setattr(settings, "pipeline_held_stages", ["briefing"])
assert await briefing_worker.run() is None
assert called["pipeline"] is False
@pytest.mark.asyncio
async def test_study_explanation_consumer_held(monkeypatch):
from workers import study_queue_consumer
touched = []
async def fake_reset():
touched.append("reset")
monkeypatch.setattr(study_queue_consumer, "reset_stale_study_jobs", fake_reset)
monkeypatch.setattr(settings, "pipeline_held_stages", ["study_explanation"])
await study_queue_consumer.consume_study_queue()
assert touched == []
@pytest.mark.asyncio
async def test_study_consumers_held_no_db_touch(monkeypatch):
"""held 시 stale reset 포함 DB 접근 0 — claim 미발생 실증."""
from workers import study_memo_card_jobs_consumer, study_session_queue_consumer
touched = []
async def fake_reset_session():
touched.append("session_reset")
async def fake_reset_card():
touched.append("card_reset")
monkeypatch.setattr(
study_session_queue_consumer, "reset_stale_session_jobs", fake_reset_session
)
monkeypatch.setattr(
study_memo_card_jobs_consumer, "reset_stale_card_jobs", fake_reset_card
)
monkeypatch.setattr(
settings, "pipeline_held_stages",
["study_session_analysis", "study_memo_card"],
)
await study_session_queue_consumer.consume_study_session_queue()
await study_memo_card_jobs_consumer.consume_study_memo_card_queue()
assert touched == []