diff --git a/app/main.py b/app/main.py index 66ef288..b2b4339 100644 --- a/app/main.py +++ b/app/main.py @@ -61,7 +61,7 @@ async def lifespan(app: FastAPI): from workers.csb_collector import run as csb_collector_run from workers.api_standards_collector import run as api_standards_run from workers.ccps_collector import run as ccps_collector_run - from workers.queue_consumer import consume_queue, consume_markdown_queue + from workers.queue_consumer import consume_queue, consume_fast_queue, consume_markdown_queue from workers.study_queue_consumer import consume_study_queue from workers.study_session_queue_consumer import consume_study_session_queue from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue @@ -95,6 +95,9 @@ async def lifespan(app: FastAPI): # 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을 # stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지. scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer") + # 2026-06-12 fast-consumer split: embed/chunk(건당 <1s)를 LLM 사이클에서 분리 — + # classify(~190s×3)가 사이클을 점유해 벡터 적재가 굶던 구조 캡 해소 (markdown 선례). + scheduler.add_job(consume_fast_queue, "interval", minutes=1, id="fast_queue_consumer") scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup") # PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리). diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 9008d43..94cbe57 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -28,7 +28,10 @@ _hold_logged = False # deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. # fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면 # 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수. -BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, +# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이 +# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음. +# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일). +BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1, "fulltext": 3} STALE_THRESHOLD_MINUTES = 10 @@ -38,14 +41,21 @@ STALE_THRESHOLD_MINUTES = 10 # 따라서 markdown consumer 는 별도의 generous 임계를 쓴다. MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120")) -# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue 로 분리. +# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue, +# embed/chunk 는 consume_fast_queue (2026-06-12) 로 분리 — 세 집합은 disjoint +# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험). # STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). MAIN_QUEUE_STAGES = [ - "extract", "classify", "summarize", "embed", "chunk", + "extract", "classify", "summarize", "preview", "stt", "thumbnail", "deep_summary", "fulltext", ] MARKDOWN_QUEUE_STAGES = ["markdown"] +# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비. +# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다 +# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴. +FAST_QUEUE_STAGES = ["embed", "chunk"] + async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES): """processing 상태로 오래 방치된 항목 복구 (지정 stage 한정) @@ -358,6 +368,27 @@ async def consume_queue(): await _process_stage(stage, workers[stage]) +async def consume_fast_queue(): + """embed/chunk 전용 고속 소비자 — LLM 사이클과 완전 디커플 (2026-06-12). + + main 루프는 classify/summarize/deep 가 사이클을 분 단위로 점유해 건당 <1s 짜리 + embed/chunk 가 사이클당 1번씩만 기회를 얻었다 (실효 ~60건/시 = 적체 원인). + 분리 후 = 1분 잡 × 배치 10 → 캡 ~600건/시. APScheduler max_instances=1 이라 + 배치가 1분을 넘으면 다음 fire 는 coalesce (폭주 방지). + """ + workers = _load_workers() + + try: + await reset_stale_items(FAST_QUEUE_STAGES, STALE_THRESHOLD_MINUTES) + except Exception: + logger.exception("fast stale reset failed, but continuing queue consumption") + + for stage in FAST_QUEUE_STAGES: + if stage in settings.pipeline_held_stages: + continue + await _process_stage(stage, workers[stage]) + + async def consume_markdown_queue(): """markdown 전용 큐 소비자 — 대형 PDF split 변환을 메인 파이프라인과 분리. diff --git a/tests/test_pipeline_hold.py b/tests/test_pipeline_hold.py index b3a8b1f..b6613db 100644 --- a/tests/test_pipeline_hold.py +++ b/tests/test_pipeline_hold.py @@ -23,7 +23,11 @@ def _fake_consumer_env(monkeypatch, held): monkeypatch.setattr(queue_consumer, "reset_stale_items", fake_reset) monkeypatch.setattr( queue_consumer, "_load_workers", - lambda: {s: object() for s in queue_consumer.MAIN_QUEUE_STAGES + ["markdown"]}, + lambda: { + s: object() + for s in (queue_consumer.MAIN_QUEUE_STAGES + + queue_consumer.FAST_QUEUE_STAGES + ["markdown"]) + }, ) monkeypatch.setattr(queue_consumer, "_hold_logged", False) monkeypatch.setattr(settings, "pipeline_held_stages", held) @@ -46,8 +50,8 @@ async def test_consume_queue_skips_held_stages(monkeypatch): assert "classify" not in processed assert "summarize" not in processed assert "deep_summary" not in processed - # GPU/특화 스테이지는 계속 처리 - for stage in ("extract", "embed", "chunk", "stt", "fulltext"): + # 특화 스테이지는 계속 처리 (embed/chunk 는 2026-06-12 fast 컨슈머로 분리) + for stage in ("extract", "stt", "fulltext"): assert stage in processed @@ -60,6 +64,36 @@ async def test_consume_queue_empty_hold_processes_all(monkeypatch): assert processed == list(queue_consumer.MAIN_QUEUE_STAGES) +@pytest.mark.asyncio +async def test_fast_consumer_processes_embed_chunk_only(monkeypatch): + """fast 컨슈머(2026-06-12 분리) = embed/chunk 전용, LLM 사이클과 디커플.""" + processed = _fake_consumer_env(monkeypatch, []) + + await queue_consumer.consume_fast_queue() + + assert processed == ["embed", "chunk"] + + +@pytest.mark.asyncio +async def test_fast_consumer_respects_hold(monkeypatch): + processed = _fake_consumer_env(monkeypatch, ["embed"]) + + await queue_consumer.consume_fast_queue() + + assert processed == ["chunk"] + + +def test_fast_split_invariants(): + """세 컨슈머 stage 집합 disjoint + embed/chunk 배치 상향 회귀 가드.""" + main = set(queue_consumer.MAIN_QUEUE_STAGES) + fast = set(queue_consumer.FAST_QUEUE_STAGES) + md = set(queue_consumer.MARKDOWN_QUEUE_STAGES) + assert not (main & fast) and not (main & md) and not (fast & md) + assert fast == {"embed", "chunk"} + assert queue_consumer.BATCH_SIZE["embed"] >= 10 + assert queue_consumer.BATCH_SIZE["chunk"] >= 10 + + @pytest.mark.asyncio async def test_markdown_consumer_not_held(monkeypatch): """markdown 컨슈머는 홀드 비대상 (LLM 무관 — marker GPU 변환)."""