From 2edc80d4bb0e781067905271adc977e95d8dade6 Mon Sep 17 00:00:00 2001 From: hyungi Date: Sun, 24 May 2026 10:33:45 +0000 Subject: [PATCH] fix(search): split markdown into dedicated queue consumer to prevent pipeline stall MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 대형 PDF split 변환(5210 ≈ 40분 실측)이 단일 consume_queue 코루틴을 점유해 extract/classify/embed/chunk 등 전 파이프라인을 stall 시키던 문제 제거. - consume_markdown_queue 신규 — markdown 전용 scheduler job (id=markdown_consumer) - consume_queue 는 MAIN_QUEUE_STAGES (markdown 제외) 만 처리 - _process_stage / _load_workers 헬퍼로 per-stage 로직 공유 - reset_stale_items(stages, threshold_minutes) 파라미터화: main=10min(markdown 제외), markdown=MARKDOWN_STALE_MINUTES(기본 120). marker_worker 는 heartbeat 미기록이라 40분 변환을 10분 stale 로 오인하던 함정 차단 - enqueue flow (classify -> embed,chunk,markdown) 불변 STT/deep_summary 분리 + GPU 동시성 튜닝은 out of scope (follow-up). Co-Authored-By: Claude Opus 4.7 (1M context) --- app/main.py | 6 +- app/workers/queue_consumer.py | 234 +++++++++++++++++++++------------- 2 files changed, 151 insertions(+), 89 deletions(-) diff --git a/app/main.py b/app/main.py index b9c1e34..68c380b 100644 --- a/app/main.py +++ b/app/main.py @@ -51,7 +51,7 @@ async def lifespan(app: FastAPI): from workers.law_monitor import run as law_monitor_run from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run - from workers.queue_consumer import consume_queue + from workers.queue_consumer import consume_queue, consume_markdown_queue from workers.study_queue_consumer import consume_study_queue from workers.study_session_queue_consumer import consume_study_session_queue from workers.study_question_embed_worker import ( @@ -77,6 +77,10 @@ async def lifespan(app: FastAPI): scheduler = AsyncIOScheduler(timezone="Asia/Seoul") # 상시 실행 scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer") + # PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) 전용 consumer. + # 대형 PDF split 변환(수십 분)이 메인 consume_queue 를 점유해 전 파이프라인을 + # stall 시키던 문제 제거. max_instances=1(기본) 으로 동시 marker 변환 2건은 방지. + scheduler.add_job(consume_markdown_queue, "interval", minutes=1, id="markdown_consumer") scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup") # PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리). diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 8212d04..5478120 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -1,5 +1,12 @@ -"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출""" +"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출. +PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) stage 를 별 consumer 로 분리. +대형 PDF split 변환(수십 분) 이 단일 consume_queue 코루틴을 점유해 전 파이프라인을 +stall 시키던 문제 제거. consume_queue = markdown 제외 전 stage / consume_markdown_queue = +markdown 전용. 두 consumer 의 stage 집합은 disjoint 이라 같은 row 경합/중복 reset 없음. +""" + +import os from datetime import datetime, timedelta, timezone from sqlalchemy import select, update, delete, exists @@ -18,16 +25,32 @@ logger = setup_logger("queue_consumer") BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1} STALE_THRESHOLD_MINUTES = 10 +# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다. +# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분 +# stale 임계로 보면 살아있는 변환을 stale 로 오인 → pending 복구해 이중 처리한다. +# 따라서 markdown consumer 는 별도의 generous 임계를 쓴다. +MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120")) + +# consume_queue(메인) 가 담당하는 stage. markdown 은 consume_markdown_queue 로 분리. +# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up). +MAIN_QUEUE_STAGES = [ + "extract", "classify", "summarize", "embed", "chunk", + "preview", "stt", "thumbnail", "deep_summary", +] +MARKDOWN_QUEUE_STAGES = ["markdown"] -async def reset_stale_items(): - """processing 상태로 오래 방치된 항목 복구 +async def reset_stale_items(stages, threshold_minutes=STALE_THRESHOLD_MINUTES): + """processing 상태로 오래 방치된 항목 복구 (지정 stage 한정) 1) 같은 (document_id, stage)에 pending 행이 이미 있으면 stale processing 행은 중복이므로 삭제 2) pending이 없는 stale processing 행만 pending으로 되돌림 + + stages: 이 reset 의 대상 stage 목록. consume_queue 와 consume_markdown_queue 가 + 서로 disjoint 한 stage 집합 + 서로 다른 threshold 로 호출한다. """ - cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES) + cutoff = datetime.now(timezone.utc) - timedelta(minutes=threshold_minutes) processing_row = aliased(ProcessingQueue) pending_row = aliased(ProcessingQueue) @@ -39,6 +62,7 @@ async def reset_stale_items(): .where( ProcessingQueue.id.in_( select(processing_row.id).where( + processing_row.stage.in_(stages), processing_row.status == "processing", processing_row.started_at.is_not(None), processing_row.started_at < cutoff, @@ -57,6 +81,7 @@ async def reset_stale_items(): # Step B: pending 없는 stale processing만 pending으로 복구 recoverable_ids = select(processing_row.id).where( + processing_row.stage.in_(stages), processing_row.status == "processing", processing_row.started_at.is_not(None), processing_row.started_at < cutoff, @@ -81,13 +106,13 @@ async def reset_stale_items(): recovered = update_result.rowcount or 0 if deleted > 0: logger.warning( - "deleted %s stale processing rows that already had pending duplicates", - deleted, + "deleted %s stale processing rows that already had pending duplicates (stages=%s)", + deleted, stages, ) if recovered > 0: logger.warning( - "recovered %s stale processing rows back to pending", - recovered, + "recovered %s stale processing rows back to pending (stages=%s)", + recovered, stages, ) except IntegrityError: logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle") @@ -142,8 +167,8 @@ async def enqueue_next_stage(document_id: int, current_stage: str): await session.commit() -async def consume_queue(): - """큐에서 pending 항목을 가져와 stage별 워커 실행""" +def _load_workers(): + """stage → worker process 함수 dict (lazy import — 순환 import 회피).""" from workers.classify_worker import process as classify_process from workers.chunk_worker import process as chunk_process from workers.deep_summary_worker import process as deep_summary_process @@ -155,7 +180,7 @@ async def consume_queue(): from workers.thumbnail_worker import process as thumbnail_process from workers.marker_worker import process as marker_process - workers = { + return { "extract": extract_process, "classify": classify_process, "summarize": summarize_process, @@ -168,92 +193,125 @@ async def consume_queue(): # next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립). "deep_summary": deep_summary_process, # Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립). + # consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록). "markdown": marker_process, } + +async def _process_stage(stage, worker_fn): + """단일 stage 의 pending 항목을 batch 만큼 가져와 워커 실행. + + consume_queue / consume_markdown_queue 가 공유한다. 항목별 독립 세션 + + processing→completed/failed 상태 전이 + 재시도 정책은 기존 로직 그대로. + """ + batch_size = BATCH_SIZE.get(stage, 3) + + # pending 항목 조회 + async with async_session() as session: + result = await session.execute( + select(ProcessingQueue.id, ProcessingQueue.document_id) + .where( + ProcessingQueue.stage == stage, + ProcessingQueue.status == "pending", + ) + .order_by(ProcessingQueue.created_at) + .limit(batch_size) + ) + pending_items = result.all() + + # 각 항목을 독립 세션에서 처리 + for queue_id, document_id in pending_items: + # 상태를 processing으로 변경 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) + if not item or item.status != "pending": + continue + item.status = "processing" + item.started_at = datetime.now(timezone.utc) + item.attempts += 1 + await session.commit() + + # 워커 실행 (독립 세션) + try: + # note(메모)는 이미 extracted_text가 있으므로 extract/preview skip + if stage in ("extract", "preview"): + from models.document import Document + async with async_session() as check_session: + doc = await check_session.get(Document, document_id) + if doc and doc.file_type == "note": + async with async_session() as skip_session: + item = await skip_session.get(ProcessingQueue, queue_id) + if item: + item.status = "completed" + item.completed_at = datetime.now(timezone.utc) + await skip_session.commit() + await enqueue_next_stage(document_id, stage) + logger.info(f"[{stage}] document_id={document_id} skip (note)") + continue + + async with async_session() as worker_session: + await worker_fn(document_id, worker_session) + await worker_session.commit() + + # 완료 처리 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) + if not item: + logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") + continue + item.status = "completed" + item.completed_at = datetime.now(timezone.utc) + await session.commit() + + await enqueue_next_stage(document_id, stage) + logger.info(f"[{stage}] document_id={document_id} 완료") + + except Exception as e: + # 실패 처리 + async with async_session() as session: + item = await session.get(ProcessingQueue, queue_id) + if not item: + logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") + continue + # 빈 메시지 방어: str → repr → 클래스명 순 fallback + err_text = str(e) or repr(e) or type(e).__name__ + item.error_message = err_text[:500] + if item.attempts >= item.max_attempts: + item.status = "failed" + logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") + else: + item.status = "pending" + item.started_at = None + logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") + await session.commit() + + +async def consume_queue(): + """메인 큐 소비자 — markdown 제외 전 stage 를 1분 간격으로 처리.""" + workers = _load_workers() + try: - await reset_stale_items() + await reset_stale_items(MAIN_QUEUE_STAGES, STALE_THRESHOLD_MINUTES) except Exception: logger.exception("stale reset failed, but continuing queue consumption") - for stage, worker_fn in workers.items(): - batch_size = BATCH_SIZE.get(stage, 3) + for stage in MAIN_QUEUE_STAGES: + await _process_stage(stage, workers[stage]) - # pending 항목 조회 - async with async_session() as session: - result = await session.execute( - select(ProcessingQueue.id, ProcessingQueue.document_id) - .where( - ProcessingQueue.stage == stage, - ProcessingQueue.status == "pending", - ) - .order_by(ProcessingQueue.created_at) - .limit(batch_size) - ) - pending_items = result.all() - # 각 항목을 독립 세션에서 처리 - for queue_id, document_id in pending_items: - # 상태를 processing으로 변경 - async with async_session() as session: - item = await session.get(ProcessingQueue, queue_id) - if not item or item.status != "pending": - continue - item.status = "processing" - item.started_at = datetime.now(timezone.utc) - item.attempts += 1 - await session.commit() +async def consume_markdown_queue(): + """markdown 전용 큐 소비자 — 대형 PDF split 변환을 메인 파이프라인과 분리. - # 워커 실행 (독립 세션) - try: - # note(메모)는 이미 extracted_text가 있으므로 extract/preview skip - if stage in ("extract", "preview"): - from models.document import Document - async with async_session() as check_session: - doc = await check_session.get(Document, document_id) - if doc and doc.file_type == "note": - async with async_session() as skip_session: - item = await skip_session.get(ProcessingQueue, queue_id) - if item: - item.status = "completed" - item.completed_at = datetime.now(timezone.utc) - await skip_session.commit() - await enqueue_next_stage(document_id, stage) - logger.info(f"[{stage}] document_id={document_id} skip (note)") - continue + 한 doc 변환이 수십 분 걸려도 메인 consume_queue 는 영향받지 않는다. + APScheduler max_instances=1(기본) 이므로 변환 진행 중엔 이 consumer 의 + 다음 fire 만 coalesce 된다(동시 marker 변환 2건 방지 — 의도된 직렬화). + """ + workers = _load_workers() - async with async_session() as worker_session: - await worker_fn(document_id, worker_session) - await worker_session.commit() + try: + await reset_stale_items(MARKDOWN_QUEUE_STAGES, MARKDOWN_STALE_THRESHOLD_MINUTES) + except Exception: + logger.exception("markdown stale reset failed, but continuing queue consumption") - # 완료 처리 - async with async_session() as session: - item = await session.get(ProcessingQueue, queue_id) - if not item: - logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") - continue - item.status = "completed" - item.completed_at = datetime.now(timezone.utc) - await session.commit() - - await enqueue_next_stage(document_id, stage) - logger.info(f"[{stage}] document_id={document_id} 완료") - - except Exception as e: - # 실패 처리 - async with async_session() as session: - item = await session.get(ProcessingQueue, queue_id) - if not item: - logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip") - continue - # 빈 메시지 방어: str → repr → 클래스명 순 fallback - err_text = str(e) or repr(e) or type(e).__name__ - item.error_message = err_text[:500] - if item.attempts >= item.max_attempts: - item.status = "failed" - logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}") - else: - item.status = "pending" - item.started_at = None - logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}") - await session.commit() + for stage in MARKDOWN_QUEUE_STAGES: + await _process_stage(stage, workers[stage])