ops(pipeline): embed/chunk 고속 컨슈머 분리 + 배치 1→10 — LLM 사이클 인질 해소

진단(2026-06-12 용량 평가): 단일 루프에서 classify(~190s×3)가 사이클을 점유,
건당 <1s 인 embed/chunk 가 사이클당 1건 캡 → 실효 ~580/일 vs 수요 최대 2,700/일,
적체 3,570 + 신규 문서 벡터 미적재(RAG 검색 누락). 4070 가동률 0% = 순수 구조 캡.
수리 = markdown 분리(05-01) 선례: consume_fast_queue 1분 잡 + 배치 10(GPU 공유 보수값,
캡 ~14,400/일). 세 컨슈머 stage 집합 disjoint(stale reset 이중 복구 방지). retrieval
로직·임베딩 모델 무접촉.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-12 07:50:07 +09:00
parent 9c9ff6eeba
commit 5dca5b5d28
3 changed files with 75 additions and 7 deletions
+4 -1
View File
@@ -61,7 +61,7 @@ async def lifespan(app: FastAPI):
from workers.csb_collector import run as csb_collector_run from workers.csb_collector import run as csb_collector_run
from workers.api_standards_collector import run as api_standards_run from workers.api_standards_collector import run as api_standards_run
from workers.ccps_collector import run as ccps_collector_run from workers.ccps_collector import run as ccps_collector_run
from workers.queue_consumer import consume_queue, consume_markdown_queue from workers.queue_consumer import consume_queue, consume_fast_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
@@ -95,6 +95,9 @@ async def lifespan(app: FastAPI):
# 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을 # 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을
# stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지. # stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지.
scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer") scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer")
# 2026-06-12 fast-consumer split: embed/chunk(건당 <1s)를 LLM 사이클에서 분리 —
# classify(~190s×3)가 사이클을 점유해 벡터 적재가 굶던 구조 캡 해소 (markdown 선례).
scheduler.add_job(consume_fast_queue, "interval", minutes=1, id="fast_queue_consumer")
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup") scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup")
# PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리). # PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리).
+34 -3
View File
@@ -28,7 +28,10 @@ _hold_logged = False
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면 # fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수. # 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, # embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3} "fulltext": 3}
STALE_THRESHOLD_MINUTES = 10 STALE_THRESHOLD_MINUTES = 10
@@ -38,14 +41,21 @@ STALE_THRESHOLD_MINUTES = 10
# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다. # 따라서 markdown consumer 는 별도의 generous 임계를 쓴다.
MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120")) MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"))
# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue 로 분리. # consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue,
# embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). # STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [ MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize", "embed", "chunk", "extract", "classify", "summarize",
"preview", "stt", "thumbnail", "deep_summary", "fulltext", "preview", "stt", "thumbnail", "deep_summary", "fulltext",
] ]
MARKDOWN_QUEUE_STAGES = ["markdown"] MARKDOWN_QUEUE_STAGES = ["markdown"]
# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비.
# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다
# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴.
FAST_QUEUE_STAGES = ["embed", "chunk"]
async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES): async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES):
"""processing 상태로 오래 방치된 항목 복구 (지정 stage 한정) """processing 상태로 오래 방치된 항목 복구 (지정 stage 한정)
@@ -358,6 +368,27 @@ async def consume_queue():
await _process_stage(stage, workers[stage]) await _process_stage(stage, workers[stage])
async def consume_fast_queue():
"""embed/chunk 전용 고속 소비자 — LLM 사이클과 완전 디커플 (2026-06-12).
main 루프는 classify/summarize/deep 가 사이클을 분 단위로 점유해 건당 <1s 짜리
embed/chunk 가 사이클당 1번씩만 기회를 얻었다 (실효 ~60건/시 = 적체 원인).
분리 후 = 1분 잡 × 배치 10 → 캡 ~600건/시. APScheduler max_instances=1 이라
배치가 1분을 넘으면 다음 fire 는 coalesce (폭주 방지).
"""
workers = _load_workers()
try:
await reset_stale_items(FAST_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("fast stale reset failed, but continuing queue consumption")
for stage in FAST_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
async def consume_markdown_queue(): async def consume_markdown_queue():
"""markdown 전용 큐 소비자 — 대형 PDF split 변환을 메인 파이프라인과 분리. """markdown 전용 큐 소비자 — 대형 PDF split 변환을 메인 파이프라인과 분리.
+37 -3
View File
@@ -23,7 +23,11 @@ def _fake_consumer_env(monkeypatch, held):
monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset) monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset)
monkeypatch.setattr( monkeypatch.setattr(
queue_consumer, "_load_workers", queue_consumer, "_load_workers",
lambda: {s: object() for s in queue_consumer.MAIN_QUEUE_STAGES + ["markdown"]}, lambda: {
s: object()
for s in (queue_consumer.MAIN_QUEUE_STAGES
+ queue_consumer.FAST_QUEUE_STAGES + ["markdown"])
},
) )
monkeypatch.setattr(queue_consumer, "_hold_logged", False) monkeypatch.setattr(queue_consumer, "_hold_logged", False)
monkeypatch.setattr(settings, "pipeline_held_stages", held) monkeypatch.setattr(settings, "pipeline_held_stages", held)
@@ -46,8 +50,8 @@ async def test_consume_queue_skips_held_stages(monkeypatch):
assert "classify" not in processed assert "classify" not in processed
assert "summarize" not in processed assert "summarize" not in processed
assert "deep_summary" not in processed assert "deep_summary" not in processed
# GPU/특화 스테이지는 계속 처리 # 특화 스테이지는 계속 처리 (embed/chunk 는 2026-06-12 fast 컨슈머로 분리)
for stage in ("extract", "embed", "chunk", "stt", "fulltext"): for stage in ("extract", "stt", "fulltext"):
assert stage in processed assert stage in processed
@@ -60,6 +64,36 @@ async def test_consume_queue_empty_hold_processes_all(monkeypatch):
assert processed == list(queue_consumer.MAIN_QUEUE_STAGES) assert processed == list(queue_consumer.MAIN_QUEUE_STAGES)
@pytest.mark.asyncio
async def test_fast_consumer_processes_embed_chunk_only(monkeypatch):
"""fast 컨슈머(2026-06-12 분리) = embed/chunk 전용, LLM 사이클과 디커플."""
processed = _fake_consumer_env(monkeypatch, [])
await queue_consumer.consume_fast_queue()
assert processed == ["embed", "chunk"]
@pytest.mark.asyncio
async def test_fast_consumer_respects_hold(monkeypatch):
processed = _fake_consumer_env(monkeypatch, ["embed"])
await queue_consumer.consume_fast_queue()
assert processed == ["chunk"]
def test_fast_split_invariants():
"""세 컨슈머 stage 집합 disjoint + embed/chunk 배치 상향 회귀 가드."""
main = set(queue_consumer.MAIN_QUEUE_STAGES)
fast = set(queue_consumer.FAST_QUEUE_STAGES)
md = set(queue_consumer.MARKDOWN_QUEUE_STAGES)
assert not (main & fast) and not (main & md) and not (fast & md)
assert fast == {"embed", "chunk"}
assert queue_consumer.BATCH_SIZE["embed"] >= 10
assert queue_consumer.BATCH_SIZE["chunk"] >= 10
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_markdown_consumer_not_held(monkeypatch): async def test_markdown_consumer_not_held(monkeypatch):
"""markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환).""" """markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""